Skip to content

Instantly share code, notes, and snippets.

@WillianFuks
Created December 8, 2017 23:37
Show Gist options
  • Save WillianFuks/42c4e9488af13a6d05f07081bc8dcbe6 to your computer and use it in GitHub Desktop.
Save WillianFuks/42c4e9488af13a6d05f07081bc8dcbe6 to your computer and use it in GitHub Desktop.
Scheduler in GAE
import utils
from flask import Flask, request
from config import config
from connector.gcp import GCPService
from scheduler import SchedulerJob
app = Flask(__name__)
gcp_service = GCPService()
scheduler = SchedulerJob()
@app.route("/export_customers", methods=['POST'])
def export():
date = (None if request.form.get('date') == 'None' else
utils.process_url_date(request.form.get('date')))
query_job_body = utils.load_query_job_body(date,
**config)
job = gcp_service.bigquery.execute_job(config['general']['project_id'],
query_job_body)
gcp_service.bigquery.poll_job(job)
extract_job_body = utils.load_extract_job_body(date, **config)
gcp_service.bigquery.execute_job(config['general']['project_id'],
extract_job_body)
return "finished"
@app.route("/dataproc_dimsum", methods=['POST'])
def dataproc_dimsum():
extended_args = request.form.get('extended_args').split(',')
setup = config['jobs']['run_dimsum']
job = gcp_service.dataproc.build_cluster(**setup)
gcp_service.storage.upload_from_filenames(
**config['jobs']['run_dimsum']['pyspark_job'])
job = gcp_service.dataproc.submit_pyspark_job(extended_args,
**config['jobs']['run_dimsum'])
result = gcp_service.dataproc.delete_cluster(**setup)
scheduler.run({'url': '/prepare_datastore',
'target': config['jobs']['dataflow_export'][
'dataflow_service']})
return "finished"
@app.route("/prepare_datastore", methods=['POST'])
def prepare_datastore():
result = gcp_service.dataflow.run_template(**config['jobs'][
'dataflow_export'])
return "finished"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment