-
-
Save jpadilla/ffd480dfece79510f55e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SECRET_KEY=super-secret | |
DATABASE_URL=sqlite:///soundem.db | |
DEBUG=yes |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import jwt | |
import dotenv | |
from functools import wraps | |
from flask import Flask, jsonify, request, g, abort | |
from flask.ext.sqlalchemy import SQLAlchemy | |
from flask_cors import cross_origin | |
from passlib.context import CryptContext | |
from flask_sslify import SSLify | |
dotenv.read_dotenv() | |
app = Flask(__name__) | |
BASE_PATH = os.path.dirname(os.path.abspath(__file__)) | |
# App Config | |
app.config.update( | |
DEBUG=(os.environ.get('DEBUG') == 'yes'), | |
SECRET_KEY=os.environ.get('SECRET_KEY'), | |
SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URL') | |
) | |
# Initialize database | |
db = SQLAlchemy(app) | |
# Initialize Flask-SSLify | |
sslify = SSLify(app, subdomains=True) | |
# Passlib Config | |
password_context = CryptContext(['pbkdf2_sha256']) | |
def make_password(raw_password): | |
return password_context.encrypt(raw_password) | |
def check_password(raw_password, password): | |
return password_context.verify(raw_password, password) | |
def generate_token(payload, secret): | |
return jwt.encode(payload, secret) | |
def decode_token(token, secret): | |
try: | |
return jwt.decode(token, secret) | |
except (jwt.ExpiredSignature, jwt.DecodeError): | |
return None | |
def auth_token_required(fn): | |
@wraps(fn) | |
def decorated(*args, **kwargs): | |
auth = request.headers.get('Authorization', None) | |
if auth is None: | |
raise abort(401, description='Authorization Required') | |
parts = auth.split() | |
if parts[0].lower() != 'jwt': | |
raise abort(401, description='Unsupported authorization type') | |
elif len(parts) == 1: | |
raise abort(401, description='Token missing') | |
elif len(parts) > 2: | |
raise abort(401, description='Token contains spaces') | |
user = User.find_by_token(parts[1]) | |
if not user: | |
raise abort(401) | |
g.user = user | |
return fn(*args, **kwargs) | |
return decorated | |
class User(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
email = db.Column(db.String(255), unique=True) | |
password = db.Column(db.String(255)) | |
def __init__(self, email, password): | |
self.email = email | |
self.set_password(password) | |
@classmethod | |
def create(cls, email, password): | |
user = User(email=email, password=password) | |
db.session.add(user) | |
db.session.commit() | |
return user | |
@classmethod | |
def find_by_email(cls, email): | |
return User.query.filter_by(email=email).first() | |
@classmethod | |
def find_by_token(cls, token): | |
payload = decode_token(token, app.config['SECRET_KEY']) | |
if not payload or 'id' not in payload: | |
return None | |
return User.query.filter_by(id=payload['id']).first() | |
def set_password(self, raw_password): | |
self.password = make_password(raw_password) | |
def check_password(self, raw_password): | |
return check_password(raw_password, self.password) | |
def get_auth_token(self): | |
payload = { | |
'id': self.id | |
} | |
return generate_token(payload, app.config['SECRET_KEY']) | |
class Artist(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
name = db.Column(db.String(80), unique=True) | |
bio = db.Column(db.Text()) | |
def __init__(self, name, bio): | |
self.name = name | |
self.bio = bio | |
@classmethod | |
def get_all(cls): | |
return Artist.query.all() | |
@classmethod | |
def get(cls, artist_id): | |
return Artist.query.filter_by(id=artist_id).first() | |
class Album(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
name = db.Column(db.String(80)) | |
artwork_url = db.Column(db.String(255)) | |
artist_id = db.Column(db.Integer, db.ForeignKey('artist.id')) | |
artist = db.relationship('Artist', | |
backref=db.backref('albums', lazy='dynamic')) | |
def __init__(self, name, artist, artwork_url=None): | |
self.name = name | |
self.artist = artist | |
if artwork_url: | |
self.artwork_url = artwork_url | |
@classmethod | |
def get_all(cls): | |
return Album.query.all() | |
@classmethod | |
def get(cls, album_id): | |
return Album.query.filter_by(id=album_id).first() | |
@classmethod | |
def total_count(cls): | |
return Album.query.count() | |
class Song(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
name = db.Column(db.String(80)) | |
url = db.Column(db.String(255)) | |
duration = db.Column(db.Integer) | |
album_id = db.Column(db.Integer, db.ForeignKey('album.id')) | |
album = db.relationship('Album', | |
backref=db.backref('songs', lazy='dynamic')) | |
def __init__(self, name, album, url=None, duration=None): | |
self.name = name | |
self.album = album | |
if url: | |
self.url = url | |
if duration: | |
self.duration = duration | |
@classmethod | |
def get_all(cls): | |
return Song.query.all() | |
@classmethod | |
def get_favorites(cls, user): | |
favorites = Favorite.query.filter_by(user=user) | |
song_ids = [favorite.song_id for favorite in favorites] | |
if song_ids: | |
return Song.filter_by_ids(song_ids) | |
return [] | |
@classmethod | |
def filter_by_ids(cls, song_ids): | |
return Song.query.filter(Song.id.in_(song_ids)) | |
@classmethod | |
def get(cls, song_id): | |
return Song.query.filter_by(id=song_id).first() | |
@classmethod | |
def total_count(cls): | |
return Song.query.count() | |
@classmethod | |
def total_duration(cls): | |
duration = 0 | |
for song in Song.get_all(): | |
duration += song.duration | |
return duration | |
def set_favorite(self, user, favorite): | |
if favorite is True: | |
return self.favorite(user) | |
if favorite is False: | |
return self.unfavorite(user) | |
def favorite(self, user): | |
favorite = Favorite.query.filter_by(song=self, user=user).first() | |
is_favorited = True | |
if not favorite: | |
favorite = Favorite(song=self, user=user) | |
db.session.add(favorite) | |
db.session.commit() | |
return is_favorited | |
def unfavorite(self, user): | |
favorite = Favorite.query.filter_by(song=self, user=user).first() | |
is_favorited = False | |
if favorite: | |
db.session.delete(favorite) | |
db.session.commit() | |
return is_favorited | |
def is_favorited(self, user): | |
favorite = Favorite.query.filter_by(song=self, user=user).first() | |
return True if favorite else False | |
class Favorite(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
user_id = db.Column(db.Integer, db.ForeignKey('user.id')) | |
user = db.relationship('User', | |
backref=db.backref('favorites', lazy='dynamic')) | |
song_id = db.Column(db.Integer, db.ForeignKey('song.id')) | |
song = db.relationship('Song', | |
backref=db.backref('favorites', lazy='dynamic')) | |
def __init__(self, song, user): | |
self.song = song | |
self.user = user | |
@app.route('/api/v1/login', methods=['POST']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
def login(): | |
data = request.get_json() or {} | |
email = data.get('email') | |
password = data.get('password') | |
errors = {} | |
if not email: | |
errors['email'] = 'Field is required.' | |
if not password: | |
errors['password'] = 'Field is required.' | |
user = User.find_by_email(email) | |
if not user: | |
errors['email'] = 'User does not exist.' | |
elif not user.check_password(password): | |
errors['password'] = 'Invalid password.' | |
if errors: | |
return jsonify({'errors': errors}), 400 | |
user_data = { | |
'id': user.id, | |
'email': user.email, | |
'token': user.get_auth_token() | |
} | |
return jsonify({'user': user_data}) | |
@app.route('/api/v1/register', methods=['POST']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
def register(): | |
data = request.get_json() or {} | |
email = data.get('email') | |
password = data.get('password') | |
errors = {} | |
if not email: | |
errors['email'] = 'Field is required.' | |
if not password: | |
errors['password'] = 'Field is required.' | |
existing_user = User.find_by_email(email) | |
if existing_user: | |
errors['email'] = 'Email is already taken' | |
if errors: | |
return jsonify({'errors': errors}), 400 | |
user = User.create(email=email, password=password) | |
user_data = { | |
'id': user.id, | |
'email': user.email, | |
'token': user.get_auth_token() | |
} | |
return jsonify({'user': user_data}), 201 | |
@app.route('/api/v1/artists', methods=['GET']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def get_artists(): | |
artists_results = [] | |
for artist in Artist.get_all(): | |
artists_results.append({ | |
'id': artist.id, | |
'name': artist.name, | |
'bio': artist.bio, | |
'albums': [album.id for album in artist.albums.all()] | |
}) | |
return jsonify({'artists': artists_results}) | |
@app.route('/api/v1/artists/<int:artist_id>', methods=['GET']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def get_artist(artist_id): | |
artist = Artist.get(artist_id) | |
if not artist: | |
abort(404) | |
artist_data = { | |
'id': artist.id, | |
'name': artist.name, | |
'bio': artist.bio, | |
'albums': [album.id for album in artist.albums.all()] | |
} | |
return jsonify({'artist': artist_data}) | |
@app.route('/api/v1/albums', methods=['GET']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def get_albums(): | |
albums_results = [] | |
for album in Album.get_all(): | |
albums_results.append({ | |
'id': album.id, | |
'name': album.name, | |
'artworkURL': album.artwork_url, | |
'artist': album.artist_id, | |
'songs': [song.id for song in album.songs.all()] | |
}) | |
return jsonify({'albums': albums_results}) | |
@app.route('/api/v1/albums/<int:album_id>', methods=['GET']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def get_album(album_id): | |
album = Album.get(album_id) | |
if not album: | |
abort(404) | |
album_data = { | |
'id': album.id, | |
'name': album.name, | |
'artworkURL': album.artwork_url, | |
'artist': album.artist_id, | |
'songs': [song.id for song in album.songs.all()] | |
} | |
return jsonify({'album': album_data}) | |
@app.route('/api/v1/songs', methods=['GET']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def get_songs(): | |
songs_results = [] | |
favorite = request.args.get('favorite') | |
song_ids = request.args.getlist('ids[]') | |
if favorite == 'true': | |
songs = Song.get_favorites(g.user) | |
elif song_ids: | |
songs = Song.filter_by_ids(song_ids) | |
else: | |
songs = Song.get_all() | |
for song in songs: | |
songs_results.append({ | |
'id': song.id, | |
'name': song.name, | |
'album': song.album.id, | |
'favorite': song.is_favorited(g.user), | |
'duration': song.duration, | |
'url': song.url | |
}) | |
return jsonify({'songs': songs_results}) | |
@app.route('/api/v1/songs/<int:song_id>', methods=['GET', 'PUT']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def song(song_id): | |
song = Song.get(song_id) | |
is_favorited = None | |
if not song: | |
abort(404) | |
if request.method == 'PUT': | |
data = request.get_json() or {} | |
data_song = data.get('song') or {} | |
favorite = data_song.get('favorite') | |
if favorite is not None: | |
# Update song if favorite param was sent | |
is_favorited = song.set_favorite(g.user, favorite) | |
else: | |
song = Song.get(song_id) | |
if is_favorited is None: | |
# Check if song was favorited | |
is_favorited = song.is_favorited(g.user) | |
song_data = { | |
'id': song.id, | |
'name': song.name, | |
'album': song.album.id, | |
'favorite': is_favorited, | |
'duration': song.duration, | |
'url': song.url | |
} | |
return jsonify({'song': song_data}) | |
@app.route('/api/v1/users/<int:user_id>', methods=['GET']) | |
@cross_origin(headers=['Content-Type', 'Authorization']) | |
@auth_token_required | |
def user(user_id): | |
user = g.user | |
if user.id != user_id: | |
abort(403) | |
user_data = { | |
'id': user.id, | |
'email': user.email, | |
'songTotal': Song.total_count(), | |
'albumTotal': Album.total_count(), | |
'durationTotal': Song.total_duration() | |
} | |
return jsonify({'user': user_data}) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', debug=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Flask==0.10.1 | |
Flask-Cors==1.6.0 | |
Flask-SQLAlchemy==1.0 | |
Flask-SSLify==0.1.4 | |
Flask-Script==2.0.5 | |
Jinja2==2.7.3 | |
MarkupSafe==0.23 | |
PyJWT==0.2.1 | |
SQLAlchemy==0.9.7 | |
Werkzeug==0.9.6 | |
django-dotenv==1.2 | |
gunicorn==19.1.0 | |
itsdangerous==0.24 | |
passlib==1.6.2 | |
pycparser==2.10 | |
six==1.7.3 | |
wsgiref==0.1.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment