Created
January 8, 2017 05:41
-
-
Save mazz/38864c91438fa7111c8eecd9d7eb929b to your computer and use it in GitHub Desktop.
my_task.apply_async() not firing?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyramid.httpexceptions import HTTPNotFound, HTTPFound | |
from websauna.utils.slug import uuid_to_slug | |
from websauna.utils.slug import slug_to_uuid | |
from websauna.system.core import messages | |
from websauna.system.http import Request | |
from websauna.system.core.route import simple_route | |
from email.header import Header | |
from email.utils import formataddr | |
from websauna.system.mail import send_templated_mail | |
from .models import Media | |
from .tasks import my_task | |
import pafy | |
# Configure view named home at path / using a template hearit/home.html | |
@simple_route("/", route_name="home", renderer="hearit/home.html") | |
def home(request: Request): | |
"""Render the site homepage.""" | |
latest_media_list = request.dbsession.query(Media).order_by(Media.published_at.desc()).all()[:10] | |
return locals() | |
@simple_route("/media/{media_uuid}", route_name="detail", renderer="hearit/detail.html") | |
def detail(request): | |
# Convert base64 encoded UUID string from request path to Python UUID object | |
media_uuid = slug_to_uuid(request.matchdict["media_uuid"]) | |
media = request.dbsession.query(Media).filter_by(uuid=media_uuid).one_or_none() | |
if not media: | |
raise HTTPNotFound() | |
if request.method == "POST": | |
media = request.dbsession.query(Media).filter_by(uuid=media_uuid).first() | |
video = pafy.new(media.url) | |
media.title = video.title | |
media.duration = video.duration | |
media.description = video.description | |
bestaudio = video.getbestaudio() | |
print("bestaudio: {0}".format(bestaudio)) | |
my_task.apply_async(args=(9,), tm=request.transaction_manager) | |
if request.user: | |
# my_task.apply_async(tm=request.transaction_manager) | |
# send_review_sms_notification.apply_async(args=(delivery.id,), tm=request.transaction_manager) | |
send_templated_mail(request, [request.user.email], "email/audio_downloaded", context={}, sender="[email protected]", immediate=True) | |
request.dbsession.flush() | |
messages.add(request, msg="got the post", kind="success") | |
return HTTPFound(request.route_url("fetch_media_result", media_uuid=uuid_to_slug(media.uuid))) | |
if not media: | |
raise HTTPNotFound() | |
return locals() | |
@simple_route("/media/{media_uuid}/fetch_media_result", route_name="fetch_media_result", renderer="hearit/fetch_media_result.html") | |
def fetch_media_result(request: Request): | |
# Convert base64 encoded UUID string from request path to Python UUID object | |
media_uuid = slug_to_uuid(request.matchdict["media_uuid"]) | |
media = request.dbsession.query(Media).filter_by(uuid=media_uuid).first() | |
if not media: | |
raise HTTPNotFound() | |
return locals() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
log