Skip to content

Instantly share code, notes, and snippets.

@FitzAfful
Last active January 10, 2025 14:58
Show Gist options
  • Save FitzAfful/8aaf75544f6d39025445cf4d11c2191d to your computer and use it in GitHub Desktop.
Save FitzAfful/8aaf75544f6d39025445cf4d11c2191d to your computer and use it in GitHub Desktop.
Second iteration of quiz_session_consumer.py
import logging
from typing import Tuple, Dict, Any, Union
from channels.db import database_sync_to_async
from django.core.exceptions import ObjectDoesNotExist
from ..models import QuizSession, QuizParticipant, Answer, QuizSessionQuestion, UserAnswer
from ..models.multiplayer_models import MultiplayerSession
from ..serializers import FirebaseUserSerializer, QuizParticipantSerializer
logger = logging.getLogger(__name__)
@database_sync_to_async
def is_quiz_active(quiz_session):
return quiz_session.start_time is not None and quiz_session.end_time is None and not quiz_session.is_completed
@database_sync_to_async
def get_quiz_session(session_id: str) -> QuizSession:
return QuizSession.objects.get(id=session_id)
@database_sync_to_async
def save_quiz_session(quiz_session):
quiz_session.save()
@database_sync_to_async
def update_player_ready_status(participant_id, is_ready) -> bool:
try:
participant = QuizParticipant.objects.get(id=participant_id)
participant.is_ready = is_ready
participant.save()
logger.info(f"Updated ready status for participant {participant_id} to {is_ready}")
return True
except ObjectDoesNotExist:
logger.error(f"Participant with id {participant_id} not found")
return False
@database_sync_to_async
def check_players_ready(session_id, min_players=2) -> bool:
quiz_session = QuizSession.objects.get(id=session_id)
ready_players_count = QuizParticipant.objects.filter(
quiz_session=quiz_session,
is_ready=True,
is_connected=True
).count()
logger.info(f'Current players ready: {ready_players_count}')
return ready_players_count >= min_players
@database_sync_to_async
def get_participants(session_id: str) -> list[QuizParticipant]:
""" Get all participants in the current session. """
quiz_session = QuizSession.objects.get(id=session_id)
return list(QuizParticipant.objects.filter(quiz_session=quiz_session))
@database_sync_to_async
def get_correct_answer_id(question_id) -> str:
return str(Answer.objects.get(question_id=question_id, is_correct=True).answer_id)
@database_sync_to_async
def check_all_disconnected(session_id: str) -> bool:
quiz_session = QuizSession.objects.get(id=session_id)
connected_participants = QuizParticipant.objects.filter(
quiz_session=quiz_session,
is_connected=True
).count()
if connected_participants == 0:
quiz_session.is_completed = True
quiz_session.save()
return True
return False
@database_sync_to_async
def update_participant_connection_status(participant_id, is_connected):
QuizParticipant.objects.filter(id=participant_id).update(is_connected=is_connected)
@database_sync_to_async
def serialize_user(user) -> Dict:
return FirebaseUserSerializer(user).data
@database_sync_to_async
def serialize_participant(participant: QuizParticipant) -> Dict:
"""
Serialize a QuizParticipant instance into a dictionary.
Args:
participant (QuizParticipant): The participant to serialize.
Returns:
dict: The serialized data of the participant.
"""
return QuizParticipantSerializer(participant).data
@database_sync_to_async
def get_players_status(session_id: str):
quiz_session = QuizSession.objects.get(id=session_id)
participants = QuizParticipant.objects.filter(quiz_session=quiz_session)
return [
{
'id': str(p.user.id),
'user': FirebaseUserSerializer(p.user).data,
'is_host': p.is_host,
'is_ready': p.is_ready,
'score': p.score
} for p in participants
]
@database_sync_to_async
def get_total_scores(session_id: str, user, is_multiplayer: bool = False):
quiz_session = QuizSession.objects.get(id=session_id)
participants = QuizParticipant.objects.filter(quiz_session=quiz_session)
scores = {}
if is_multiplayer:
for participant in participants:
# Distinguish current user from opponent in multiplayer
if participant.user == user:
scores['your_score'] = participant.score
else:
scores['opponent_score'] = participant.score
else:
# In single player, just get the user's score
participant = participants.get(user=user)
scores['your_score'] = participant.score
return scores
@database_sync_to_async
def get_current_question(quiz_session, current_question_number):
question_session = QuizSessionQuestion.objects.select_related('question').filter(
quiz_session=quiz_session,
order=current_question_number
).first()
if question_session:
question = question_session.question
return {
'session_question_id': str(question_session.id),
'question_id': question.question_id,
'question_text': question.question_text,
'question_image_url': question.question_image_url,
'question_difficulty': question.get_question_difficulty_display(),
'answers': [{'answer_id': answer.answer_id, 'answer_text': answer.answer_text} for answer in
question.answers.all()],
'correct_answer_id': Answer.objects.get(question=question, is_correct=True).answer_id
}
return None
@database_sync_to_async
def get_total_questions(session_id: str) -> int:
quiz_session = QuizSession.objects.get(id=session_id)
return max(quiz_session.question_count, 1)
@database_sync_to_async
def check_if_multiplayer(session_id) -> bool:
try:
MultiplayerSession.objects.get(quiz_session_id=session_id)
return True
except ObjectDoesNotExist:
return False
@database_sync_to_async
def get_or_create_participant(session_id: str, user) -> Union[tuple[Any, Any], tuple[None, bool]]:
"""
Retrieve the participant if they exist. If not, create a new participant.
Args:
session_id (str): The ID of the quiz session.
user (User): The user for whom to get or create the participant.
Returns:
Tuple[Optional[QuizParticipant], bool]: A tuple containing the participant instance (or None if not found)
and a boolean indicating whether a new participant was created.
"""
try:
quiz_session = QuizSession.objects.get(id=session_id)
participant, created = QuizParticipant.objects.get_or_create(
quiz_session=quiz_session,
user=user,
defaults={'is_connected': True}
)
return participant, created # Return whether it's a new participant
except ObjectDoesNotExist:
logger.error(f"Quiz session with id {session_id} not found")
return None, False # Return None and False to indicate no participant and not created
@database_sync_to_async
def get_next_question(quiz_session, total_questions, current_question_number):
if current_question_number >= total_questions:
return None
answered_questions = UserAnswer.objects.filter(
quiz_participant__quiz_session=quiz_session
).values_list('quiz_session_question_id', flat=True)
return QuizSessionQuestion.objects.filter(quiz_session=quiz_session).exclude(
id__in=answered_questions).order_by('order').first()
import json
import logging
import time
from datetime import datetime
from uuid import UUID
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from ..models import QuizParticipant, Answer, QuizSession, QuizSessionQuestion, UserAnswer, XPProfile, \
AllTimeLeaderboardScore, MonthlyLeaderboardScore
from ..serializers import DetailedQuizSessionSerializer
from ..services import quiz_db_service
from ..services.quiz_timer_service import QuizTimerManager
logger = logging.getLogger(__name__)
class UUIDEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
return str(obj)
return super().default(obj)
class QuizConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.room_group_name = None
self.session_id = None
self.question_start_time = None
self.timer_task = None
self.current_question_number = 0
self.streak = 0
self.current_stage = None
self.is_multiplayer = False
self.submitted_answers = {}
self.total_questions = 0
self.timer_manager = QuizTimerManager()
async def connect(self):
self.session_id = self.scope['url_route']['kwargs']['session_id']
self.room_group_name = f'quiz_{self.session_id}'
self.timer_manager.set_channel_info(self.channel_layer, self.room_group_name)
logger.info(f"Attempting connection for session {self.session_id}")
self.is_multiplayer = await quiz_db_service.check_if_multiplayer(self.session_id)
self.total_questions = await quiz_db_service.get_total_questions(self.session_id)
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
logger.info(f"Connection accepted for session {self.session_id}")
# Get or create the participant
participant, created = await quiz_db_service.get_or_create_participant(self.session_id, user=self.scope['user'])
if created:
# New participant
logger.info("New connection, created participant")
if self.is_multiplayer:
serialized_participant = await quiz_db_service.serialize_participant(participant)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'player_joined',
'player': serialized_participant
}
)
else:
# Existing participant, check if it was previously disconnected
if not participant.is_connected:
logger.info(f"Reconnection for participant {participant.id}")
await self.handle_reconnection(participant)
else:
logger.info(f"Participant {participant.id} is already connected")
# Set participant as connected
await quiz_db_service.update_participant_connection_status(participant.id, True)
# Send connection acknowledgment
await self.send(text_data=json.dumps({
'type': 'connection_acknowledged',
'is_multiplayer': self.is_multiplayer
}))
# For single player games, automatically start the quiz
if not self.is_multiplayer:
logger.info("Single player game - automatically starting quiz")
await self.start_quiz()
async def handle_reconnection(self, participant):
await quiz_db_service.update_participant_connection_status(participant.id, True)
# Multiplayer: Notify others about the reconnection
if self.is_multiplayer:
serialized_participant = await quiz_db_service.serialize_participant(participant)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'player_reconnected',
'player': serialized_participant
}
)
# Send the quiz state to the reconnected player (both single and multiplayer)
await self.send_quiz_state()
# Resume the quiz flow if the quiz is active (single or multiplayer)
quiz_session = await quiz_db_service.get_quiz_session(self.session_id)
if await quiz_db_service.is_quiz_active(quiz_session):
await self.resume_quiz_flow()
async def disconnect(self, close_code):
try:
logger.info(f"Disconnecting. Close code: {close_code}")
logger.info(f"Current stage: {self.current_stage}")
logger.info(f"Current question: {self.current_question_number}/{self.total_questions}")
await self.timer_manager.cancel_timer()
participant, _ = await quiz_db_service.get_or_create_participant(self.session_id, user=self.scope['user'])
if participant:
logger.info(f"Participant {participant.id} disconnecting")
await quiz_db_service.update_participant_connection_status(participant.id, False)
if self.is_multiplayer:
logger.info(f"Multiplayer disconnect for user {participant.user.username}")
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_disconnect',
'user_id': str(participant.user.id),
'username': participant.user.username,
'avatar': participant.user.avatar
}
)
if self.is_multiplayer:
all_disconnected = await quiz_db_service.check_all_disconnected(self.session_id)
logger.info(f"All participants disconnected: {all_disconnected}")
if all_disconnected:
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'quiz_abandoned',
'message': 'All participants disconnected. Quiz ended.'
}
)
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
logger.info(f"Disconnected successfully from session {self.session_id}")
except Exception as e:
logger.error(f"Error during disconnect: {str(e)}", exc_info=True)
raise
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message_type = text_data_json['type']
logger.info(f"Received message of type: {message_type}")
if message_type == 'submit_answer':
logger.info(f"Received submit_answer message: {text_data_json}")
await self.handle_submit_answer(text_data_json)
elif message_type == 'player_ready':
await self.handle_player_ready()
else:
logger.warning(f"Unknown message type: {message_type}")
async def player_joined(self, event):
await self.send(text_data=json.dumps({
'type': 'player_joined',
'player': event['player']
}))
async def player_left(self, event):
await self.send(text_data=json.dumps({
'type': 'player_left',
'player': event['player']
}))
async def handle_submit_answer(self, data):
""" Handle player answer submission. """
try:
if self.current_stage == 'answering':
participant, _ = await quiz_db_service.get_or_create_participant(self.session_id, self.scope['user'])
if not participant:
logger.error("Failed to get or create participant")
return
logger.info(f"Processing answer submission for participant {participant.id}")
await self.submit_answer(data['question_id'], data['answer_id'])
# Mark this participant as having submitted their answer
self.submitted_answers[participant.id] = True
logger.info(f"Player {participant.id} has submitted their answer.")
# Check if all players have submitted their answers
if await self.all_players_submitted():
logger.info("All players have submitted, calling time_up")
await self.time_up()
else:
logger.warning(f"Received answer submission during {self.current_stage} stage. Ignoring.")
except Exception as e:
logger.error(f"Error in handle_submit_answer: {str(e)}", exc_info=True)
raise
async def all_players_submitted(self):
""" Check if all players in the game have submitted their answers. """
try:
participants = await quiz_db_service.get_participants(self.session_id)
for participant in participants:
if participant.id not in self.submitted_answers or not self.submitted_answers[participant.id]:
return False
return True
except Exception as e:
logger.error(f"Error checking all players submitted: {str(e)}")
return False
async def all_players_ready(self, event):
await self.send(text_data=json.dumps({
'type': 'all_players_ready'
}))
# Ensure quiz only starts after all players are ready
if self.is_multiplayer:
self.current_stage = 'show_countdown'
# Use timer manager for countdown
await self.timer_manager.start_countdown_timer(
duration=3,
send_update=lambda remaining: self.send(text_data=json.dumps({
'type': 'timer_update',
'stage': self.current_stage,
'duration': remaining
})),
on_complete=self.start_quiz
)
else:
await self.start_quiz()
async def start_quiz(self):
logger.info("Starting quiz.")
quiz_session = await quiz_db_service.get_quiz_session(self.session_id)
if quiz_session.is_completed:
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'This quiz has already been completed.'
}))
return
# Set the quiz as started in the database
quiz_session.is_started = True
await quiz_db_service.save_quiz_session(quiz_session)
# Send the initial quiz state
await self.send_quiz_state()
# Start the quiz by sending the first question
await self.send_next_question()
async def send_next_question(self):
try:
quiz_session = await quiz_db_service.get_quiz_session(self.session_id)
quiz_session_question = await quiz_db_service.get_next_question(quiz_session, self.total_questions, self.current_question_number)
if quiz_session_question and self.current_question_number < self.total_questions:
self.current_question_number += 1
question_data = await quiz_db_service.get_current_question(quiz_session, self.current_question_number)
await self.send(text_data=json.dumps({
'type': 'question',
'question_number': self.current_question_number,
'total_questions': self.total_questions,
'current_question': question_data,
'is_multiplayer': self.is_multiplayer
}))
await self.start_round_intro()
else:
logger.info("No more questions available or reached total questions, sending quiz end.")
await self.send_quiz_end()
except Exception as e:
logger.error(f"Error in send_next_question: {str(e)}")
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'An error occurred while fetching the next question.'
}))
async def handle_player_ready(self):
logger.info("Handle player ready.")
if self.is_multiplayer:
participant, created = await quiz_db_service.get_or_create_participant(self.session_id, user=self.scope['user'])
if participant:
success = await quiz_db_service.update_player_ready_status(participant.id, True)
serialized_participant = await quiz_db_service.serialize_participant(participant)
if success:
# Broadcast the player_ready event to the entire room, not just the sending client
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'broadcast_player_ready',
'player': serialized_participant
}
)
logger.info("Player ready.")
# Check if enough players are ready
enough_players_ready = await quiz_db_service.check_players_ready(self.session_id)
if enough_players_ready:
logger.info("Enough players ready.")
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'all_players_ready'
}
)
else:
logger.info("Waiting for more players to be ready.")
else:
logger.info("Could not update player ready status.")
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'Failed to update ready status'
}))
else:
logger.info("Player not found.")
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'Participant not found'
}))
else:
await self.start_quiz()
async def player_ready(self, event):
# Broadcast the player_ready event to all clients in the group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'broadcast_player_ready',
'player': event['player']
}
)
async def broadcast_player_ready(self, event):
await self.send(text_data=json.dumps({
'type': 'player_ready',
'player': event['player']
}))
async def start_round_intro(self):
self.current_stage = 'show_round'
await self.send_timer_update('show_round', 1.5)
await self.timer_manager.start_timer(1.5, self.show_question)
async def show_question(self):
self.current_stage = 'show_question'
await self.send_timer_update('show_question', 1.5)
await self.timer_manager.start_timer(1.5, self.start_answering)
async def start_answering(self):
""" Start answering timer and wait for answers from all participants. """
await self.timer_manager.cancel_timer()
self.current_stage = 'answering'
self.question_start_time = time.time()
self.submitted_answers = {}
await self.timer_manager.start_answering_timer(
duration=10,
time_up_callback=self.time_up,
all_submitted_check=self.all_players_submitted
)
async def broadcast_timer_update(self, event):
"""Handler for broadcast timer updates"""
await self.send_timer_update(event['stage'], event['duration'])
async def handle_timer_update(self, event):
"""Handle timer updates received through the channel layer"""
await self.send_timer_update(event['stage'], event['duration'])
async def time_up(self):
"""Proceed to the next step once all answers are in or time is up."""
logger.info("Time up called")
# Cancel any existing timer
await self.timer_manager.cancel_timer()
# Notify all participants of the end of the round
await self.broadcast_question_completed()
# Proceed to showing the correct answer
await self.show_correct_answer()
async def send_timer_update(self, stage, duration):
await self.send(text_data=json.dumps({
'type': 'timer_update',
'stage': stage,
'duration': duration
}))
async def broadcast_question_completed(self):
""" Notify all participants that the question is completed. """
scores = await quiz_db_service.get_total_scores(self.session_id, self.scope['user'], is_multiplayer=self.is_multiplayer)
# Different response structure for single vs multiplayer
response_data = {
'type': 'question_completed',
'question_id': self.current_question_number,
}
if self.is_multiplayer:
response_data['scores'] = {
'your_score': scores['your_score'],
'opponent_score': scores['opponent_score']
}
else:
response_data['scores'] = {
'your_score': scores['your_score']
}
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'broadcast_answer',
'data': response_data
}
)
async def broadcast_answer(self, event):
""" Broadcast the question completed status to all participants. """
await self.send(text_data=json.dumps(event['data']))
async def submit_answer(self, question_id, answer_id):
await self.timer_manager.cancel_timer()
if self.question_start_time:
time_taken = time.time() - self.question_start_time
else:
time_taken = 10
is_correct, score = await self.save_user_answer(question_id, answer_id, time_taken)
scores = await quiz_db_service.get_total_scores(self.session_id, self.scope['user'], is_multiplayer=self.is_multiplayer)
# Different response structure for single vs multiplayer
response_data = {
'type': 'question_completed',
'question_id': question_id,
}
if self.is_multiplayer:
response_data['scores'] = {
'your_score': scores['your_score'],
'opponent_score': scores['opponent_score']
}
else:
response_data['scores'] = {
'your_score': scores['your_score']
}
if self.is_multiplayer:
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'broadcast_answer',
'data': response_data
}
)
else:
await self.send(text_data=json.dumps(response_data))
await self.show_correct_answer()
async def show_correct_answer(self):
self.current_stage = 'show_correct_answer'
await self.send_timer_update('show_correct_answer', 3)
await self.timer_manager.cancel_timer()
if self.current_question_number < self.total_questions:
await self.timer_manager.start_timer(3, self.send_next_question)
else:
await self.timer_manager.start_timer(3, self.send_quiz_end)
async def send_quiz_end(self):
results = await self.end_quiz()
# This broadcasts to all consumers in the group, not to clients directly
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'quiz_ended',
'results': results
}
)
async def quiz_ended(self, event):
# This sends the actual WebSocket message to the client
# This is the only message the client receives
await self.send(text_data=json.dumps({
'type': 'quiz_ended',
'results': event['results']
}))
async def resume_quiz_flow(self):
if self.current_stage == 'show_round':
await self.start_round_intro()
elif self.current_stage == 'show_question':
await self.show_question()
elif self.current_stage == 'answering':
await self.start_answering()
elif self.current_stage == 'show_correct_answer':
await self.show_correct_answer()
else:
logger.warning(f"Unknown stage {self.current_stage}, starting from the beginning")
await self.start_round_intro()
@database_sync_to_async
def save_user_answer(self, question_id, answer_id, time_taken):
try:
quiz_session = QuizSession.objects.get(id=self.session_id)
quiz_session_question = QuizSessionQuestion.objects.get(quiz_session=quiz_session, question_id=question_id)
participant = QuizParticipant.objects.get(quiz_session=quiz_session, user=self.scope["user"])
if answer_id:
answer = Answer.objects.get(answer_id=answer_id)
is_correct = answer.is_correct
else:
is_correct = False
# Calculate score based on the difficulty and remaining time
if quiz_session.difficulty == 'easy':
base_score = max(1, 20 - time_taken)
elif quiz_session.difficulty == 'medium':
base_score = max(5, 30 - time_taken)
elif quiz_session.difficulty == 'hard':
base_score = max(10, 40 - time_taken)
else:
base_score = max(5, 30 - time_taken)
# Apply streak and bonuses
if self.current_question_number == self.total_questions:
base_score *= 2
base_score += 20 - time_taken
if is_correct:
self.streak += 1
if self.streak == 3:
base_score += 5
elif self.streak == 5:
base_score += 10
elif self.streak >= 10:
base_score += 20
else:
self.streak = 0
# Save the answer and update the score
UserAnswer.objects.create(
quiz_participant=participant,
quiz_session_question=quiz_session_question,
answer_id=answer_id if answer_id is not None else None,
is_correct=is_correct,
time_taken=time_taken,
score=base_score
)
participant.score += base_score
participant.save()
return is_correct, base_score
except ObjectDoesNotExist as e:
logger.error(f"Error in save_user_answer: {str(e)}")
return None, 0
@database_sync_to_async
def end_quiz(self):
quiz_session = QuizSession.objects.get(id=self.session_id)
participants = QuizParticipant.objects.filter(quiz_session=quiz_session)
logger.info("End Quiz triggered")
highest_score = 0
winners = []
# Calculate the highest score and identify winners
for participant in participants:
correct_answers = UserAnswer.objects.filter(
quiz_participant=participant, is_correct=True
).count()
participant.correct_answers = correct_answers
if participant.score > highest_score:
highest_score = participant.score
winners = [participant]
elif participant.score == highest_score:
winners.append(participant)
# Apply victory bonus to winners and save participants
victory_bonus = 50
for winner in winners:
winner.score += victory_bonus
winner.save()
# Mark quiz as completed
quiz_session.is_completed = True
quiz_session.save()
# Add XP for each participant and handle level-up logic
for participant in participants:
# Calculate XP based on the quiz score or performance criteria
base_xp = participant.score
if participant in winners:
base_xp += victory_bonus # Additional XP for winners
profile, created = XPProfile.objects.get_or_create(user=participant.user)
profile.add_xp(base_xp) # This will handle level-up if needed
# Update Leaderboard with points
self.update_leaderboards(participant.user, quiz_session.category, base_xp)
# Serialize and return the session results
detailed_serializer = DetailedQuizSessionSerializer(quiz_session)
result_data = detailed_serializer.data
return result_data
async def send_quiz_state(self):
quiz_session = await quiz_db_service.get_quiz_session(self.session_id)
current_question = await quiz_db_service.get_current_question(quiz_session, self.current_question_number)
if self.total_questions == 0:
self.total_questions = await quiz_db_service.get_total_questions(self.session_id)
state_data = {
'type': 'quiz_state',
'question_number': self.current_question_number,
'total_questions': self.total_questions,
'current_stage': self.current_stage,
'is_multiplayer': self.is_multiplayer
}
if current_question:
state_data['current_question'] = current_question
else:
state_data['message'] = 'No current question available.'
if self.is_multiplayer:
state_data['players'] = await quiz_db_service.get_players_status(self.session_id)
await self.send(text_data=json.dumps(state_data))
async def user_disconnect(self, event):
await self.send(text_data=json.dumps({
'type': 'user_disconnected',
'user_id': event['user_id'],
'username': event['username'],
'avatar': event['avatar']
}))
async def user_reconnected(self, event):
await self.send(text_data=json.dumps({
'type': 'user_reconnected',
'user_id': event['user_id'],
'username': event['username'],
'avatar': event['avatar']
}))
async def quiz_abandoned(self, event):
await self.send(text_data=json.dumps({
'type': 'quiz_abandoned',
'message': event['message']
}))
def update_leaderboards(self, user, category, points):
current_date = datetime.now()
year = current_date.year
month = current_date.month
with transaction.atomic():
# Update MonthlyLeaderboardScore
monthly_score, created = MonthlyLeaderboardScore.objects.get_or_create(
user=user, category=category, year=year, month=month,
defaults={'score': 0}
)
monthly_score.score += points
monthly_score.save()
# Update AllTimeLeaderboardScore
all_time_score, created = AllTimeLeaderboardScore.objects.get_or_create(
user=user, category=category,
defaults={'score': 0}
)
all_time_score.score += points
all_time_score.save()
import asyncio
import logging
from typing import Optional, Callable
logger = logging.getLogger(__name__)
class QuizTimerManager:
def __init__(self, channel_layer=None, room_group_name=None):
self.timer_task: Optional[asyncio.Task] = None
self.channel_layer = channel_layer
self.room_group_name = room_group_name
self.lock = asyncio.Lock()
def set_channel_info(self, channel_layer, room_group_name):
"""Update channel layer information"""
self.channel_layer = channel_layer
self.room_group_name = room_group_name
async def start_timer(self, duration: float, next_stage: Callable) -> None:
"""Start a basic timer that executes the next_stage callback after duration"""
async with self.lock:
await self.cancel_timer()
self.timer_task = asyncio.create_task(self._timer(duration, next_stage))
async def start_countdown_timer(self, duration: int, send_update: Callable, on_complete: Callable) -> None:
"""Start a countdown timer that sends updates each second
Args:
duration: Time in seconds for countdown
send_update: Callback to send updates to client
on_complete: Callback to execute when countdown finishes
"""
await self.cancel_timer()
self.timer_task = asyncio.create_task(self._countdown_timer(duration, send_update, on_complete))
async def _countdown_timer(self, duration: int, send_update: Callable, on_complete: Callable) -> None:
"""Internal countdown timer implementation with updates each second"""
try:
for i in range(duration, 0, -1):
await send_update(i)
await asyncio.sleep(1)
await on_complete()
except asyncio.CancelledError:
logger.info("Countdown timer cancelled")
raise
except Exception as e:
logger.error(f"Error in countdown timer: {str(e)}", exc_info=True)
raise
async def start_answering_timer(self, duration: int, time_up_callback: Callable,
all_submitted_check: Callable) -> None:
"""
Start the answering phase timer with periodic updates
Args:
duration: Time in seconds for answering
time_up_callback: Callback to execute when time is up
all_submitted_check: Function to check if all players submitted
"""
await self.cancel_timer()
self.timer_task = asyncio.create_task(
self._answering_timer(duration, time_up_callback, all_submitted_check)
)
async def _timer(self, duration: float, next_stage: Callable) -> None:
"""Internal timer implementation"""
try:
await asyncio.sleep(duration)
await next_stage()
except asyncio.CancelledError:
logger.info("Timer cancelled")
raise
except Exception as e:
logger.error(f"Error in timer: {str(e)}", exc_info=True)
raise
async def _answering_timer(self, duration: int, time_up_callback: Callable,
all_submitted_check: Callable) -> None:
"""Internal answering timer implementation with periodic updates"""
try:
remaining_time = duration
logger.info(f"Starting answering timer for {duration} seconds")
while remaining_time > 0:
# Send timer update to all participants
if self.channel_layer and self.room_group_name:
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'broadcast_timer_update',
'stage': 'answering',
'duration': remaining_time
}
)
# Check if all players submitted their answers
if await all_submitted_check():
logger.info("All players submitted their answers")
break
await asyncio.sleep(1)
remaining_time -= 1
# Call time_up callback when timer ends
await time_up_callback()
except asyncio.CancelledError:
logger.info("Answering timer cancelled")
raise
except Exception as e:
logger.error(f"Error in answering timer: {str(e)}", exc_info=True)
raise
async def cancel_timer(self) -> None:
"""Cancel any running timer"""
try:
if self.timer_task and not self.timer_task.done():
logger.info("Cancelling timer task")
self.timer_task.cancel()
try:
await self.timer_task
except asyncio.CancelledError:
logger.info("Timer task cancelled successfully")
except Exception as e:
logger.error(f"Error while awaiting cancelled timer: {str(e)}", exc_info=True)
self.timer_task = None
except Exception as e:
logger.error(f"Error in cancel_timer: {str(e)}", exc_info=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment