Last active
June 25, 2016 21:46
-
-
Save Akramz/85f7434b30a8c402e422e24859499660 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import csv | |
import sqlite3 | |
import numpy as np | |
import pandas as pd | |
from flask import Flask,render_template | |
from flask import request, g | |
from flask import jsonify | |
from collections import Counter | |
# database placement | |
DATABASE = '/var/www/html/nuweth/data/DB.db' | |
# init app instance | |
app = Flask(__name__) | |
app.config.from_object(__name__) | |
## ROUTES ## | |
@app.route('/') | |
def index(): | |
return render_template("index.html") | |
# API | |
@app.route('/api/') | |
@app.route('/api/<location>/') | |
@app.route('/api/<location>/<time>/') | |
@app.route('/api/<location>/<time>/<int:hours>') | |
def api(location=None, time=None, hours=None): | |
if location == None: | |
return 'WELCOME TO THE API DOCS' | |
elif hours == None: | |
if time == 'future': | |
frame = pd.DataFrame() | |
future = pd.read_csv('/home/ubuntu/nuweth/predictions/' + location + '.csv') | |
future_ = pd.read_csv('/home/ubuntu/nuweth/predictions/' + location + '_.csv') | |
list_ = [future, future_] | |
frame = pd.concat(list_) | |
frame = frame.set_index('DateUTC').T.to_dict() | |
return jsonify(**frame) | |
elif time == 'past': | |
history = pd.read_csv('/home/ubuntu/nuweth/data/' + location + '.csv') | |
history = history.set_index('DateUTC').T.to_dict() | |
return jsonify(**history) | |
else: | |
if time == 'future': | |
frame = pd.DataFrame() | |
future = pd.read_csv('/home/ubuntu/nuweth/predictions/' + location + '.csv') | |
future_ = pd.read_csv('/home/ubuntu/nuweth/predictions/' + location + '_.csv') | |
list_ = [future,future_] | |
frame = pd.concat(list_) | |
frame = frame.head(hours) | |
frame = frame.set_index('DateUTC').T.to_dict() | |
return jsonify(**frame) | |
elif time == 'past': | |
history = pd.read_csv('/home/ubuntu/nuweth/data/' + location + '.csv') | |
history = history.tail(hours) | |
history = history.set_index('DateUTC').T.to_dict() | |
return jsonify(**history) | |
@app.route('/platform/<city>') | |
def platform(city): | |
one_day = pd.read_csv('/home/ubuntu/nuweth/predictions/' + city + '.csv') | |
one_day['DateUTC_'] = pd.to_datetime(one_day.DateUTC, format='%Y-%m-%d %H:%M:%S') | |
one_day['Hour'] = one_day.DateUTC_.dt.hour | |
one_day_json = one_day.reset_index().to_json(path_or_buf = None, orient = 'records', date_format = 'epoch', double_precision = 10, force_ascii = True, date_unit = 'ms', default_handler = None) | |
three_day = pd.read_csv('/home/ubuntu/nuweth/predictions/' + city + '_.csv') | |
three_day['DateUTC_'] = pd.to_datetime(three_day.DateUTC, format='%Y-%m-%d %H:%M:%S') | |
three_day['Hour'] = three_day.DateUTC_.dt.hour | |
three_day_json = three_day.reset_index().to_json(path_or_buf = None, orient = 'records', date_format = 'epoch', double_precision = 10, force_ascii = True, date_unit = 'ms', default_handler = None) | |
return render_template("platform.html", one_day=one_day_json, three_days=three_day_json, conditions=one_day[['Hour','CON']], conditions_=three_day[['Hour','CON']], city=city) | |
@app.route("/history") | |
def view_history(): | |
rows = execute_query("""SELECT * FROM history""") | |
return '<br>'.join(str(row) for row in rows) | |
@app.route("/predictions") | |
def view_predictions(): | |
rows = execute_query("""SELECT * FROM predictions""") | |
return '<br>'.join(str(row) for row in rows) | |
############ | |
## DATABASE METHODS ## | |
def connect_to_database(): | |
return sqlite3.connect(app.config['DATABASE']) | |
def get_db(): | |
db = getattr(g, 'db', None) | |
if db is None: | |
db = g.db = connect_to_database() | |
return db | |
@app.teardown_appcontext | |
def close_connection(exception): | |
db = getattr(g, 'db', None) | |
if db is not None: | |
db.close() | |
def execute_query(query, args=()): | |
cur = get_db().execute(query, args) | |
rows = cur.fetchall() | |
cur.close() | |
return rows | |
###################### | |
if __name__ == '__main__': | |
app.run(debug=True,use_reloader=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment