Created
November 18, 2017 04:23
-
-
Save mazz/ffb56fb99c1f26aa782e010563c9bb08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import uuid | |
import re | |
import datetime | |
from base64 import b64decode | |
import time | |
import jwt | |
import logging | |
logger = logging.getLogger(__name__) | |
from pyramid.httpexceptions import exception_response | |
from pyramid.httpexceptions import HTTPOk | |
from sqlalchemy.dialects.postgresql import UUID | |
from cornice.service import Service | |
from websauna.system.http import Request | |
from websauna.system.core.route import simple_route | |
from websauna.system.user.utils import get_login_service | |
from websauna.system.user.models import User | |
from websauna.system.user.interfaces import AuthenticationFailure | |
from kjvrvg.token.jwt_token import JWTToken | |
from kjvrvg.token.jwt_token import pyJWTExpiredSignatureError | |
from kjvrvg.auth.authenticator import RequestAuthenticator | |
from kjvrvg.models import Book, Chapter, MediaChapter, MediaGospel, LanguageIdentifier, Music, MediaMusic, Church, MediaSermon | |
k_api_version = 1 | |
# k_supported_languages = ['en', 'fr', 'hi', 'pt', 'es'] | |
# Configure view named home at path / using a template kjvrvg/home.html | |
@simple_route("/", route_name="home", renderer='kjvrvg/home.html') | |
def home(request: Request): | |
"""Render site homepage.""" | |
return {"project": "kjvrvg"} | |
books = Service(name='v1/books', | |
path='v1/books', | |
renderer='json', | |
accept='application/json') | |
localized_media_chapter = Service(name='v1/books/{bid}/media', | |
path='v1/books/{bid}/media', | |
renderer='json', | |
accept='application/json') | |
music = Service(name='v1/music', | |
path='v1/music', | |
renderer='json', | |
accept='application/json') | |
localized_media_music = Service(name='v1/music/{mid}/media', | |
path='v1/music/{mid}/media', | |
renderer='json', | |
accept='application/json') | |
localized_media_gospel = Service(name='v1/gospel/media', | |
path='v1/gospel/media', | |
renderer='json', | |
accept='application/json') | |
supported_languages = Service(name='v1/languages/supported', | |
path='v1/languages/supported', | |
renderer='json', | |
accept='application/json') | |
church = Service(name='v1/church', | |
path='v1/church', | |
renderer='json', | |
accept='application/json') | |
media_sermon = Service(name='v1/church/{cid}/media/sermon', | |
path='v1/church/{cid}/media/sermon', | |
renderer='json', | |
accept='application/json') | |
languages = Service(name='v1/languages', | |
path='v1/languages', | |
renderer='json', | |
accept='application/json') | |
''' | |
disable contact-us because it could be spammed | |
contactus = Service(name='v1/contact-us', | |
path='v1/contact-us', | |
renderer='json', | |
accept='application/json') | |
''' | |
''' | |
echo '{"Authentication": "Basic bWljaGFlbGtoYW5uYUBnbWFpbC5jb206bWljaGFlbGtoYW5uYQ=="}' | http localhost:6543/v1/authenticate | |
echo '{"Authentication": "Basic bWljaGFlbGtoYW5uYUBnbWFpbC5jb206bWljaGFlbGtoYW5uYQo="}' | http https://japheth.ca/v1/authenticate | |
''' | |
authenticate = Service(name='v1/authenticate', | |
path='v1/authenticate', | |
renderer='json', | |
accept='application/json') | |
''' | |
echo '{"Authentication": "JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI0ODg2MTZjOS02MGI0LTQ2NDktYWE2MC00Yzg3M2U2YjAzMjYiLCJpYXQiOjE0OTI5ODczNTQsImV4cCI6MTQ5Mjk4ODI1NH0.-FJ-f627Oc3hBtOH7RPGLkjktdawNcLW4nyOacf7zbZakzPA7_nDuE5wVDFmu6s8mb6-XyY0Ayflc3CpMn94nA"}' | http localhost:6543/v1/logout | |
''' | |
logout = Service(name='v1/logout', | |
path='v1/logout', | |
renderer='json', | |
accept='application/json') | |
''' | |
echo '{"Authentication": "JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpYXQiOjE0OTI5MTgzODQsImV4cCI6MTQ5MjkxODM5NCwic3ViIjoiMWZhMDgxNTAtNjA2Ni00MWRmLTgwOWItMWM3NmQwNmE4MjAyIn0.MWkRy2TdOyNBuJtJgW2yRN6TGztsn5yqvABsz4g-ZhvWPuEJUCMViWUB-h2Vn-tD8ItNXVIdrmh2B8nbBvN68w"}' | http localhost:6543/v1/token/refresh | |
''' | |
token_refresh = Service(name='v1/token/refresh', | |
path='v1/token/refresh', | |
renderer='json', | |
accept='application/json') | |
k_authenticate_auth_types = ['Basic'] | |
@authenticate.post(content_type="application/json") | |
def post_authenticate(request: Request): | |
user = None | |
auth_exception = None | |
if request.method == "POST": | |
# print(request.json_body) | |
credentials = request.json_body['Authentication'] | |
# print('credentials: {}'.format(credentials)) | |
tokens = None | |
response = None | |
try: | |
tokens = credentials.split(' ') | |
except Exception as e: | |
# 400 - HTTPBadRequest | |
response = exception_response(400) | |
return response | |
print('tokens: {}'.format(repr(tokens))) | |
if tokens[0] in k_authenticate_auth_types: | |
try: | |
userpassslug = b64decode(tokens[1]).decode() | |
except Exception as e: | |
response = exception_response(500) | |
return response | |
# print('userpassslug: {}'.format(userpassslug)) | |
userpassslug = userpassslug.strip(' \t\n\r') | |
try: | |
userpass = userpassslug.split(':') | |
except Exception as e: | |
return dict(status='failure', message = repr(e)) | |
# print('username: {} password: {}'.format(userpass[0], userpass[1])) | |
login_service = get_login_service(request) | |
try: | |
# user = login_service.check_credentials(username=userpass[0], password=userpass[1]) | |
user = login_service.check_credentials(username=userpass[0], password=userpass[1]) | |
except AuthenticationFailure as af: | |
logger.info('AuthenticationFailure: {}'.format(repr(af))) | |
auth_exception = af | |
response = exception_response(403) | |
return response | |
else: | |
logger.info('else') | |
else: | |
# not supported auth, so return failure | |
# 400 - HTTPBadRequest | |
response = exception_response(400) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
response = exception_response(405) | |
return response | |
if user and user.is_activated(): | |
# TODO set JWT k_token_expiration in .ini app settings | |
# logging.debug('user.is_activated: {}'.format(repr(user.is_activated()))) | |
pem_file = request.registry.settings['websauna.pem_file'] | |
token_expiration = int(request.registry.settings['websauna.jwt_token_expiration']) | |
logger.info('token_expiration: {}'.format(repr(token_expiration))) | |
jwt_token = JWTToken(pem_file, token_expiration) | |
token = jwt_token.generate(str(user.uuid), is_admin=user.is_admin()) | |
logger.info('token: {}'.format(repr(token))) | |
claims = jwt.decode(token, jwt_token.private_key, algorithm=[jwt_token.algorithm], leeway=jwt_token.leeway) | |
# logging.debug('token claims: {}'.format(repr(claims))) | |
tokendict = dict(message='success', token = token) | |
# response = Response() | |
response = HTTPOk(body=json.dumps(tokendict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
response = exception_response(405) | |
return response | |
# return dict(status='failure', message = repr(auth_exception)) | |
@logout.post(content_type="application/json") | |
def post_logout(request: Request): | |
user = None | |
auth_exception = None | |
if request.method == "POST": | |
claims = {} | |
pem_file = request.registry.settings['websauna.pem_file'] | |
token_expiration = int(request.registry.settings['websauna.jwt_token_expiration']) | |
logger.info('token_expiration: {}'.format(repr(token_expiration))) | |
token = JWTToken(pem_file, token_expiration) | |
(authentication, auth_exception) = RequestAuthenticator.parse_auth(request) | |
# logger.info('(authentication, auth_exception): {}, {}'.format(repr(authentication), repr(auth_exception))) | |
if auth_exception is not None: | |
return auth_exception | |
if authentication is not None: | |
(validated_claims, claims_exception, http_exception) = token.validate(authentication) | |
logger.debug('(validated_claims, http_exception): {}, {}'.format(repr(validated_claims), repr(http_exception))) | |
if http_exception is not None: | |
return http_exception | |
else: | |
claims = validated_claims | |
dbsession = request.dbsession | |
logger.info('dbsession: {}'.format(repr(dbsession))) | |
user = dbsession.query(User).filter_by(uuid=claims['sub']).first() | |
# logger.info('user: {}'.format(repr(user))) | |
if user is not None: | |
login_service = get_login_service(request) | |
return login_service.logout() | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@token_refresh.post(content_type="application/json") | |
def post_token_refresh(request: Request): | |
response = None | |
if request.method == "POST": | |
# logger.info('request: {}'.format(repr(request))) | |
claims = None | |
claims_exception = None | |
validated_token = None | |
http_exception = None | |
pem_file = request.registry.settings['websauna.pem_file'] | |
token_expiration = int(request.registry.settings['websauna.jwt_token_expiration']) | |
# logger.info('token_expiration: {}'.format(repr(token_expiration))) | |
token = JWTToken(pem_file, token_expiration) | |
(authentication, auth_exception) = RequestAuthenticator.parse_auth(request) | |
# logger.info('(authentication, auth_exception): {}, {}'.format(repr(authentication), repr(auth_exception))) | |
if auth_exception is not None: | |
return auth_exception | |
if authentication is not None: | |
(validated_token, claims_exception, http_exception) = token.validate(authentication) | |
# logger.info('(validated_token, http_exception): {}, {}'.format(repr(validated_token), repr(http_exception))) | |
if claims_exception is not None: | |
# logger.info('(http_exception type): {}'.format(repr(type(http_exception).__name__))) | |
if type(claims_exception).__name__ == pyJWTExpiredSignatureError: | |
(refresh_token, claims_exception) = token.refresh(authentication, 1000) | |
# logger.info('refresh_token: {}'.format(repr(refresh_token))) | |
refresh_claims = jwt.decode(refresh_token, token.private_key, algorithm=[token.algorithm], leeway=token.leeway) | |
# logging.debug('post_token_refresh claims: {}'.format(repr(refresh_claims))) | |
tokendict = dict(message='success', token = refresh_token) | |
# response = Response() | |
response = HTTPOk(body=json.dumps(tokendict)) | |
return response | |
@books.get(content_type="application/json") | |
def get_books(request: Request): | |
''' get all books names/books names and their | |
corresponding book ids | |
api call example: /v1/books | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
dbsession = request.dbsession | |
# sql query all books of the New Testament | |
# (whole database for now) | |
books = dbsession.query(Book).order_by(Book.absolute_id.asc()) | |
book_list = [] | |
''' iterate over book tuples and extract | |
the title and the book id ''' | |
for idx, row in enumerate(books): | |
book_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
book_dict['title'] = row.basename | |
book_dict['bid'] = str(row.uuid) | |
book_list.append(book_dict) | |
response_dict = dict(status = 'success', result = book_list, version = '{}'.format(str(k_api_version))) | |
# response = Response() | |
response = HTTPOk(body=json.dumps(response_dict)) | |
# response = HTTPOk(body=json.dumps(book_list)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
def has_language_id(request, **kwargs): | |
if not 'Language-Id' in request.headers: | |
request.errors.add('header', 'Language-Id', 'You need to provide an ISO-639 language identifier') | |
@localized_media_chapter.get(content_type="application/json", validators=has_language_id) | |
def get_localized_media_chapter(request: Request): | |
''' get all mediachapter for this book uuid. user must pass ISO-639 language-id | |
in the header. i.e. Language-Id:'es' | |
api call example: /v1/books/e931ea58-080f-46ee-ae21-3bbec0365ddc/media | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
# logger.info('request.headers: {}'.format(request.headers)) | |
# logger.info('request.matchdict: {}'.format(request.matchdict)) | |
bid = request.matchdict['bid'] | |
if bid is not None: | |
logger.info('bid: {}'.format(bid)) | |
try: | |
book_uuid = uuid.UUID(bid) | |
logger.info('book_uuid: {}'.format(book_uuid)) | |
except Exception as e: | |
return exception_response(401) | |
logger.info('e: {}'.format(e)) | |
dbsession = request.dbsession | |
# logger.debug('lang-id: {}'.format(request.headers['Language-Id'])) | |
language_id = __parse_language_id(request.headers['Language-Id']) | |
''' filter-out requests that ask for languages that are not supported | |
TODO: enable before we ship | |
if language_id not in __supported_languages(request): | |
# logger.info('{} not in {}'.format(language_id, repr(k_supported_languages))) | |
language_id = 'en' ''' | |
''' sql join MediaChapter, Chapter and Book to find all mediachapter where | |
book uuid matches the book uuid and the language id | |
of the mediachapter matches the Language-Id in the header ''' | |
mediachapter = dbsession.query(MediaChapter).join(Chapter).join(Book).filter(Book.uuid == book_uuid.hex).filter(MediaChapter.language_id == language_id).order_by(MediaChapter.id).all() | |
media_chapter_list = [] | |
''' iterate over the mediachapter tuples and | |
extract the localizedName and path to the | |
audio file ''' | |
for idx, row in enumerate(mediachapter): | |
media_chapter_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
media_chapter_dict['localizedName'] = row.localizedname | |
media_chapter_dict['path'] = row.path | |
media_chapter_dict['presenterName'] = row.presenter_name | |
media_chapter_dict['sourceMaterial'] = row.source_material | |
media_chapter_list.append(media_chapter_dict) | |
response_dict = dict(status='success', result = media_chapter_list, version = '{}'.format(str(k_api_version))) | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
def __parse_language_id(language_id): | |
language_list = language_id.split('-') | |
# logger.info('language_list: {}'.format(language_list)) | |
language_id = language_list[0] | |
if language_id is None: | |
language_id = '' | |
return language_id | |
def __validate_email(email): | |
found = None | |
regex = re.compile('[^@\s]+@[^@\s]+\.[a-zA-Z0-9]+$') | |
found = regex.search(email) | |
if found is not None: | |
return True | |
else: | |
return False | |
def __supported_languages(request: Request) -> list: | |
dbsession = request.dbsession | |
supported_languages = dbsession.query(LanguageIdentifier).filter(LanguageIdentifier.supported == True).order_by(LanguageIdentifier.identifier.asc()).all() | |
supported_languages_list = [] | |
for idx, row in enumerate(supported_languages): | |
supported_languages_list.append(row.identifier) | |
return supported_languages_list | |
@music.get(content_type="application/json") | |
def get_music(request: Request): | |
''' get all music names/music names and their | |
corresponding music ids | |
api call example: /v1/music | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
dbsession = request.dbsession | |
# sql query all music | |
# (whole database for now) | |
music = dbsession.query(Music).order_by(Music.id.asc()) | |
music_list = [] | |
''' iterate over music tuples and extract | |
the title and the music id ''' | |
for idx, row in enumerate(music): | |
music_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
music_dict['title'] = row.basename | |
music_dict['mid'] = str(row.uuid) | |
music_list.append(music_dict) | |
response_dict = dict(status = 'success', result = music_list, version = '{}'.format(str(k_api_version))) | |
# response = Response() | |
response = HTTPOk(body=json.dumps(response_dict)) | |
# response = HTTPOk(body=json.dumps(music_list)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@localized_media_music.get(content_type="application/json") | |
def get_localized_media_music(request: Request): | |
''' get all mediamusic for this music uuid. | |
api call example: /v1/music/e931ea58-080f-46ee-ae21-3bbec0365ddc/media | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
# logger.info('request.headers: {}'.format(request.headers)) | |
# logger.info('request.matchdict: {}'.format(request.matchdict)) | |
mid = request.matchdict['mid'] | |
if mid is not None: | |
logger.info('mid: {}'.format(mid)) | |
try: | |
music_uuid = uuid.UUID(mid) | |
logger.info('music_uuid: {}'.format(music_uuid)) | |
except Exception as e: | |
return exception_response(401) | |
logger.info('e: {}'.format(e)) | |
dbsession = request.dbsession | |
# logger.debug('lang-id: {}'.format(request.headers['Language-Id'])) | |
''' sql join MediaMusic, and Music to find all mediamusic where | |
music uuid matches the music uuid and the language id | |
of the mediamusic matches the Language-Id in the header ''' | |
mediamusic = dbsession.query(MediaMusic).join(Music).filter(Music.uuid == music_uuid.hex).order_by(MediaMusic.id).all() | |
media_music_list = [] | |
''' iterate over the mediamusic tuples and | |
extract the localizedName and path to the | |
audio file ''' | |
for idx, row in enumerate(mediamusic): | |
media_music_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
media_music_dict['localizedName'] = row.localizedname | |
media_music_dict['path'] = row.path | |
media_music_dict['presenterName'] = row.presenter_name | |
media_music_dict['sourceMaterial'] = row.source_material | |
media_music_list.append(media_music_dict) | |
response_dict = dict(status='success', result = media_music_list, version = '{}'.format(str(k_api_version))) | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@church.get(content_type="application/json") | |
def get_church(request: Request): | |
''' get all church names and their | |
corresponding church ids -- generally should only be one | |
in the entire db | |
api call example: /v1/church | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
dbsession = request.dbsession | |
# sql query all church | |
# (whole database for now) | |
church = dbsession.query(Church).order_by(Church.id.asc()) | |
church_list = [] | |
''' iterate over church tuples and extract | |
the title and the church id ''' | |
for idx, row in enumerate(church): | |
church_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
church_dict['name'] = row.church_name | |
church_dict['cid'] = str(row.uuid) | |
church_list.append(church_dict) | |
response_dict = dict(status = 'success', result = church_list, version = '{}'.format(str(k_api_version))) | |
# response = Response() | |
response = HTTPOk(body=json.dumps(response_dict)) | |
# response = HTTPOk(body=json.dumps(church_list)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@media_sermon.get(content_type="application/json") | |
def get_media_sermon(request: Request): | |
''' get all mediamusic for this music uuid. | |
api call example: /v1/church/e931ea58-080f-46ee-ae21-3bbec0365ddc/media/sermon | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
# logger.info('request.headers: {}'.format(request.headers)) | |
# logger.info('request.matchdict: {}'.format(request.matchdict)) | |
cid = request.matchdict['cid'] | |
if cid is not None: | |
logger.info('cid: {}'.format(cid)) | |
try: | |
church_uuid = uuid.UUID(cid) | |
logger.info('church_uuid: {}'.format(church_uuid)) | |
except Exception as e: | |
return exception_response(401) | |
logger.info('e: {}'.format(e)) | |
dbsession = request.dbsession | |
# logger.debug('lang-id: {}'.format(request.headers['Language-Id'])) | |
''' sql join MediaMusic, and Music to find all mediamusic where | |
music uuid matches the music uuid and the language id | |
of the mediamusic matches the Language-Id in the header ''' | |
mediasermon = dbsession.query(MediaSermon).join(Church).filter(Church.uuid == church_uuid.hex).order_by(MediaSermon.id).all() | |
media_sermon_list = [] | |
''' iterate over the mediasermon tuples and | |
extract the localizedName and path to the | |
audio file ''' | |
for idx, row in enumerate(mediasermon): | |
media_sermon_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
media_sermon_dict['title'] = row.title | |
media_sermon_dict['path'] = row.path | |
media_sermon_dict['presenterName'] = '{} {} {}'.format(row.leader.title, row.leader.first_name, row.leader.last_name) | |
media_sermon_dict['churchName'] = row.church.church_name | |
media_sermon_dict['datePresented'] = str(row.date_presented) | |
media_sermon_list.append(media_sermon_dict) | |
response_dict = dict(status='success', result = media_sermon_list, version = '{}'.format(str(k_api_version))) | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@localized_media_gospel.get(content_type="application/json") | |
def get_localized_media_gospel(request: Request): | |
''' get all mediagospel for this gospel uuid. | |
user optionally passes ISO-639 language-id | |
in the header. i.e. Language-Id:'es' | |
api call example: /v1/gospel/media | |
''' | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
# logger.info('request.headers: {}'.format(request.headers)) | |
# logger.info('request.matchdict: {}'.format(request.matchdict)) | |
dbsession = request.dbsession | |
mediagospel = None | |
if 'Language-Id' in request.headers: | |
language_id = __parse_language_id(request.headers['Language-Id']) | |
''' filter-out requests that ask for languages that are not supported | |
TODO: enable before we ship | |
if language_id not in __supported_languages(request): | |
# logger.info('{} not in {}'.format(language_id, repr(k_supported_languages))) | |
language_id = 'en' ''' | |
''' sql join MediaGospel, and Gospel to find all mediagospel where | |
gospel uuid matches the gospel uuid and the language id | |
of the mediagospel matches the Language-Id in the header ''' | |
mediagospel = dbsession.query(MediaGospel).filter(MediaGospel.language_id == language_id).order_by(MediaGospel.id).all() | |
else: | |
mediagospel = dbsession.query(MediaGospel).order_by(MediaGospel.id).all() | |
media_gospel_list = [] | |
''' iterate over the mediagospel tuples and | |
extract the localizedName and path to the | |
audio file ''' | |
for idx, row in enumerate(mediagospel): | |
media_gospel_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
media_gospel_dict['localizedName'] = row.localizedname | |
media_gospel_dict['path'] = row.path | |
media_gospel_dict['presenterName'] = row.presenter_name | |
media_gospel_list.append(media_gospel_dict) | |
response_dict = dict(status='success', result = media_gospel_list, version = '{}'.format(str(k_api_version))) | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@supported_languages.get(content_type="application/json") | |
def get_supported_languages(request: Request): | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
# logger.info('request.headers: {}'.format(request.headers)) | |
# logger.info('request.matchdict: {}'.format(request.matchdict)) | |
logger.info('__supported languages: {}'.format(repr(__supported_languages(request)))) | |
dbsession = request.dbsession | |
supported_languages = dbsession.query(LanguageIdentifier).filter(LanguageIdentifier.supported == True).order_by(LanguageIdentifier.identifier.asc()).all() | |
supported_languages_list = [] | |
''' iterate over the mediagospel tuples and | |
extract the localizedName and path to the | |
audio file ''' | |
for idx, row in enumerate(supported_languages): | |
supported_languages_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
supported_languages_dict['languageIdentifier'] = row.identifier | |
supported_languages_dict['supported'] = row.supported | |
supported_languages_list.append(supported_languages_dict) | |
response_dict = dict(status='success', result = supported_languages_list, version = '{}'.format(str(k_api_version))) | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
@languages.get(content_type="application/json") | |
def get_languages(request: Request): | |
if request.method == "GET": | |
# logger.info('request: {}'.format(request)) | |
# logger.info('request.headers: {}'.format(request.headers)) | |
# logger.info('request.matchdict: {}'.format(request.matchdict)) | |
dbsession = request.dbsession | |
languages = dbsession.query(LanguageIdentifier).order_by(LanguageIdentifier.identifier.asc()).all() | |
languages_list = [] | |
''' iterate over the languageidentifier tuples and | |
extract the languageIdentifier and supported to the | |
of the language''' | |
for idx, row in enumerate(languages): | |
languages_dict = {} | |
# logger.info('row: {}'.format(repr(row))) | |
languages_dict['languageIdentifier'] = row.identifier | |
languages_dict['supported'] = row.supported | |
languages_list.append(languages_dict) | |
response_dict = dict(status='success', result = languages_list, version = '{}'.format(str(k_api_version))) | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
''' | |
@contactus.post(content_type="application/json") | |
def post_contactus(request: Request): | |
name = None | |
message = None | |
email = None | |
signup = None | |
if request.method == "POST": | |
# logger.info('request name: {}'.format(request.json_body['name'])) | |
# logger.info('request message: {}'.format(request.json_body['message'])) | |
# logger.info('request email: {}'.format(request.json_body['email'])) | |
# logger.info('request signup: {}'.format(request.json_body['signup'])) | |
# body: {"email": "[email protected]", "name": "Foo Bar", "message": "This is a message"} | |
exception = None | |
authentication = None | |
try: | |
name = request.json_body['name'] | |
message = request.json_body['message'] | |
email = request.json_body['email'] | |
signup = request.json_body['signup'] | |
except Exception as e: | |
logger.debug('exception: {}'.format(repr(e))) | |
return exception_response(401) | |
if email is not None: | |
email = email.replace(' ', '') | |
if __validate_email(email) is True: | |
logger.info('# TODO: send email') | |
response_dict = dict(status='success', version = '{}'.format(str(k_api_version))) | |
# response = Response() | |
response = HTTPOk(body=json.dumps(response_dict)) | |
return response | |
else: | |
# 405 - HTTPMethodNotAllowed | |
return exception_response(405) | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment