Skip to content

Instantly share code, notes, and snippets.

@ak64th
Last active December 15, 2015 05:33
Show Gist options
  • Save ak64th/8526fcf017a8b58b9eca to your computer and use it in GitHub Desktop.
Save ak64th/8526fcf017a8b58b9eca to your computer and use it in GitHub Desktop.
from flask import Flask
from flask_peewee.db import Database
app = Flask(__name__)
DATABASE = {
'name': 'group_int.db',
'engine': 'peewee.SqliteDatabase',
'threadlocals': True,
}
app.config['DATABASE'] = DATABASE
db = Database(app)
from app import app, db
from models import JsapiTicket
@app.route('/')
def index():
try:
jsapi_ticket = JsapiTicket.get(JsapiTicket.id == 1)
return 'The current ticket: %s; Last refreshed at: %s' % (jsapi_ticket.ticket, jsapi_ticket.modified)
except JsapiTicket.DoesNotExist:
return 'No ticket now'
application = app.wsgi_app
from datetime import datetime
from peewee import *
from app import db
class Base(db.Model):
created = DateTimeField(default=datetime.now)
modified = DateTimeField(default=datetime.now)
def save(self, *args, **kwargs):
self.modified = datetime.now()
return super(Base, self).save(*args, **kwargs)
class JsapiTicket(Base):
ticket = CharField(null=True)
expire = IntegerField(null=True)
def __unicode__(self):
return 'ticket content:%s expires in %d seconds' % (self.ticket, self.expire)
"""
An sheduler used with flask-peewee,APScheduler and uwsgi.
Run with uwsgi:
uwsgi --http-socket :4000 --mule=scheduled_ticket.py --wsgi-file=main.py
"""
from datetime import datetime, timedelta
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from models import JsapiTicket
def get_ticket():
ticket_content = os.urandom(24).encode('hex')
return ticket_content, 20
def refresh_ticket(jsapi_ticket):
jsapi_ticket.ticket = os.urandom(24).encode('hex')
return jsapi_ticket.save()
def populate_ticket(jsapi_ticket):
ticket_content, expire = get_ticket()
jsapi_ticket.ticket = ticket_content
jsapi_ticket.expire = expire
return True
def get_or_refresh_jsapi_ticket():
jsapi_ticket, created = JsapiTicket.get_or_create(id=1)
if created and populate_ticket(jsapi_ticket) and jsapi_ticket.save():
print('Create ticket %s' % jsapi_ticket)
elif datetime.now() - timedelta(seconds = jsapi_ticket.expire) > jsapi_ticket.modified:
print('Ticket %s expired' % jsapi_ticket)
populate_ticket(jsapi_ticket)
jsapi_ticket.save()
else:
refresh_ticket(jsapi_ticket)
print('Ticket %s refreshed' % jsapi_ticket)
return jsapi_ticket
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
JsapiTicket.create_table(fail_silently=True)
scheduler = BlockingScheduler()
scheduler.add_job(get_or_refresh_jsapi_ticket, 'interval', seconds=5)
import logging
logger = logging.getLogger('apscheduler.executors.default')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment