Skip to content

Instantly share code, notes, and snippets.

@jpadilla
Created August 14, 2014 02:47
Show Gist options
  • Save jpadilla/ffd480dfece79510f55e to your computer and use it in GitHub Desktop.
Save jpadilla/ffd480dfece79510f55e to your computer and use it in GitHub Desktop.
SECRET_KEY=super-secret
DATABASE_URL=sqlite:///soundem.db
DEBUG=yes
import os
import jwt
import dotenv
from functools import wraps
from flask import Flask, jsonify, request, g, abort
from flask.ext.sqlalchemy import SQLAlchemy
from flask_cors import cross_origin
from passlib.context import CryptContext
from flask_sslify import SSLify
dotenv.read_dotenv()
app = Flask(__name__)
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
# App Config
app.config.update(
DEBUG=(os.environ.get('DEBUG') == 'yes'),
SECRET_KEY=os.environ.get('SECRET_KEY'),
SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URL')
)
# Initialize database
db = SQLAlchemy(app)
# Initialize Flask-SSLify
sslify = SSLify(app, subdomains=True)
# Passlib Config
password_context = CryptContext(['pbkdf2_sha256'])
def make_password(raw_password):
return password_context.encrypt(raw_password)
def check_password(raw_password, password):
return password_context.verify(raw_password, password)
def generate_token(payload, secret):
return jwt.encode(payload, secret)
def decode_token(token, secret):
try:
return jwt.decode(token, secret)
except (jwt.ExpiredSignature, jwt.DecodeError):
return None
def auth_token_required(fn):
@wraps(fn)
def decorated(*args, **kwargs):
auth = request.headers.get('Authorization', None)
if auth is None:
raise abort(401, description='Authorization Required')
parts = auth.split()
if parts[0].lower() != 'jwt':
raise abort(401, description='Unsupported authorization type')
elif len(parts) == 1:
raise abort(401, description='Token missing')
elif len(parts) > 2:
raise abort(401, description='Token contains spaces')
user = User.find_by_token(parts[1])
if not user:
raise abort(401)
g.user = user
return fn(*args, **kwargs)
return decorated
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True)
password = db.Column(db.String(255))
def __init__(self, email, password):
self.email = email
self.set_password(password)
@classmethod
def create(cls, email, password):
user = User(email=email, password=password)
db.session.add(user)
db.session.commit()
return user
@classmethod
def find_by_email(cls, email):
return User.query.filter_by(email=email).first()
@classmethod
def find_by_token(cls, token):
payload = decode_token(token, app.config['SECRET_KEY'])
if not payload or 'id' not in payload:
return None
return User.query.filter_by(id=payload['id']).first()
def set_password(self, raw_password):
self.password = make_password(raw_password)
def check_password(self, raw_password):
return check_password(raw_password, self.password)
def get_auth_token(self):
payload = {
'id': self.id
}
return generate_token(payload, app.config['SECRET_KEY'])
class Artist(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), unique=True)
bio = db.Column(db.Text())
def __init__(self, name, bio):
self.name = name
self.bio = bio
@classmethod
def get_all(cls):
return Artist.query.all()
@classmethod
def get(cls, artist_id):
return Artist.query.filter_by(id=artist_id).first()
class Album(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
artwork_url = db.Column(db.String(255))
artist_id = db.Column(db.Integer, db.ForeignKey('artist.id'))
artist = db.relationship('Artist',
backref=db.backref('albums', lazy='dynamic'))
def __init__(self, name, artist, artwork_url=None):
self.name = name
self.artist = artist
if artwork_url:
self.artwork_url = artwork_url
@classmethod
def get_all(cls):
return Album.query.all()
@classmethod
def get(cls, album_id):
return Album.query.filter_by(id=album_id).first()
@classmethod
def total_count(cls):
return Album.query.count()
class Song(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
url = db.Column(db.String(255))
duration = db.Column(db.Integer)
album_id = db.Column(db.Integer, db.ForeignKey('album.id'))
album = db.relationship('Album',
backref=db.backref('songs', lazy='dynamic'))
def __init__(self, name, album, url=None, duration=None):
self.name = name
self.album = album
if url:
self.url = url
if duration:
self.duration = duration
@classmethod
def get_all(cls):
return Song.query.all()
@classmethod
def get_favorites(cls, user):
favorites = Favorite.query.filter_by(user=user)
song_ids = [favorite.song_id for favorite in favorites]
if song_ids:
return Song.filter_by_ids(song_ids)
return []
@classmethod
def filter_by_ids(cls, song_ids):
return Song.query.filter(Song.id.in_(song_ids))
@classmethod
def get(cls, song_id):
return Song.query.filter_by(id=song_id).first()
@classmethod
def total_count(cls):
return Song.query.count()
@classmethod
def total_duration(cls):
duration = 0
for song in Song.get_all():
duration += song.duration
return duration
def set_favorite(self, user, favorite):
if favorite is True:
return self.favorite(user)
if favorite is False:
return self.unfavorite(user)
def favorite(self, user):
favorite = Favorite.query.filter_by(song=self, user=user).first()
is_favorited = True
if not favorite:
favorite = Favorite(song=self, user=user)
db.session.add(favorite)
db.session.commit()
return is_favorited
def unfavorite(self, user):
favorite = Favorite.query.filter_by(song=self, user=user).first()
is_favorited = False
if favorite:
db.session.delete(favorite)
db.session.commit()
return is_favorited
def is_favorited(self, user):
favorite = Favorite.query.filter_by(song=self, user=user).first()
return True if favorite else False
class Favorite(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User',
backref=db.backref('favorites', lazy='dynamic'))
song_id = db.Column(db.Integer, db.ForeignKey('song.id'))
song = db.relationship('Song',
backref=db.backref('favorites', lazy='dynamic'))
def __init__(self, song, user):
self.song = song
self.user = user
@app.route('/api/v1/login', methods=['POST'])
@cross_origin(headers=['Content-Type', 'Authorization'])
def login():
data = request.get_json() or {}
email = data.get('email')
password = data.get('password')
errors = {}
if not email:
errors['email'] = 'Field is required.'
if not password:
errors['password'] = 'Field is required.'
user = User.find_by_email(email)
if not user:
errors['email'] = 'User does not exist.'
elif not user.check_password(password):
errors['password'] = 'Invalid password.'
if errors:
return jsonify({'errors': errors}), 400
user_data = {
'id': user.id,
'email': user.email,
'token': user.get_auth_token()
}
return jsonify({'user': user_data})
@app.route('/api/v1/register', methods=['POST'])
@cross_origin(headers=['Content-Type', 'Authorization'])
def register():
data = request.get_json() or {}
email = data.get('email')
password = data.get('password')
errors = {}
if not email:
errors['email'] = 'Field is required.'
if not password:
errors['password'] = 'Field is required.'
existing_user = User.find_by_email(email)
if existing_user:
errors['email'] = 'Email is already taken'
if errors:
return jsonify({'errors': errors}), 400
user = User.create(email=email, password=password)
user_data = {
'id': user.id,
'email': user.email,
'token': user.get_auth_token()
}
return jsonify({'user': user_data}), 201
@app.route('/api/v1/artists', methods=['GET'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def get_artists():
artists_results = []
for artist in Artist.get_all():
artists_results.append({
'id': artist.id,
'name': artist.name,
'bio': artist.bio,
'albums': [album.id for album in artist.albums.all()]
})
return jsonify({'artists': artists_results})
@app.route('/api/v1/artists/<int:artist_id>', methods=['GET'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def get_artist(artist_id):
artist = Artist.get(artist_id)
if not artist:
abort(404)
artist_data = {
'id': artist.id,
'name': artist.name,
'bio': artist.bio,
'albums': [album.id for album in artist.albums.all()]
}
return jsonify({'artist': artist_data})
@app.route('/api/v1/albums', methods=['GET'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def get_albums():
albums_results = []
for album in Album.get_all():
albums_results.append({
'id': album.id,
'name': album.name,
'artworkURL': album.artwork_url,
'artist': album.artist_id,
'songs': [song.id for song in album.songs.all()]
})
return jsonify({'albums': albums_results})
@app.route('/api/v1/albums/<int:album_id>', methods=['GET'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def get_album(album_id):
album = Album.get(album_id)
if not album:
abort(404)
album_data = {
'id': album.id,
'name': album.name,
'artworkURL': album.artwork_url,
'artist': album.artist_id,
'songs': [song.id for song in album.songs.all()]
}
return jsonify({'album': album_data})
@app.route('/api/v1/songs', methods=['GET'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def get_songs():
songs_results = []
favorite = request.args.get('favorite')
song_ids = request.args.getlist('ids[]')
if favorite == 'true':
songs = Song.get_favorites(g.user)
elif song_ids:
songs = Song.filter_by_ids(song_ids)
else:
songs = Song.get_all()
for song in songs:
songs_results.append({
'id': song.id,
'name': song.name,
'album': song.album.id,
'favorite': song.is_favorited(g.user),
'duration': song.duration,
'url': song.url
})
return jsonify({'songs': songs_results})
@app.route('/api/v1/songs/<int:song_id>', methods=['GET', 'PUT'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def song(song_id):
song = Song.get(song_id)
is_favorited = None
if not song:
abort(404)
if request.method == 'PUT':
data = request.get_json() or {}
data_song = data.get('song') or {}
favorite = data_song.get('favorite')
if favorite is not None:
# Update song if favorite param was sent
is_favorited = song.set_favorite(g.user, favorite)
else:
song = Song.get(song_id)
if is_favorited is None:
# Check if song was favorited
is_favorited = song.is_favorited(g.user)
song_data = {
'id': song.id,
'name': song.name,
'album': song.album.id,
'favorite': is_favorited,
'duration': song.duration,
'url': song.url
}
return jsonify({'song': song_data})
@app.route('/api/v1/users/<int:user_id>', methods=['GET'])
@cross_origin(headers=['Content-Type', 'Authorization'])
@auth_token_required
def user(user_id):
user = g.user
if user.id != user_id:
abort(403)
user_data = {
'id': user.id,
'email': user.email,
'songTotal': Song.total_count(),
'albumTotal': Album.total_count(),
'durationTotal': Song.total_duration()
}
return jsonify({'user': user_data})
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
Flask==0.10.1
Flask-Cors==1.6.0
Flask-SQLAlchemy==1.0
Flask-SSLify==0.1.4
Flask-Script==2.0.5
Jinja2==2.7.3
MarkupSafe==0.23
PyJWT==0.2.1
SQLAlchemy==0.9.7
Werkzeug==0.9.6
django-dotenv==1.2
gunicorn==19.1.0
itsdangerous==0.24
passlib==1.6.2
pycparser==2.10
six==1.7.3
wsgiref==0.1.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment