Last active
January 10, 2025 14:58
-
-
Save FitzAfful/8aaf75544f6d39025445cf4d11c2191d to your computer and use it in GitHub Desktop.
Second iteration of quiz_session_consumer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from typing import Tuple, Dict, Any, Union | |
from channels.db import database_sync_to_async | |
from django.core.exceptions import ObjectDoesNotExist | |
from ..models import QuizSession, QuizParticipant, Answer, QuizSessionQuestion, UserAnswer | |
from ..models.multiplayer_models import MultiplayerSession | |
from ..serializers import FirebaseUserSerializer, QuizParticipantSerializer | |
logger = logging.getLogger(__name__) | |
@database_sync_to_async | |
def is_quiz_active(quiz_session): | |
return quiz_session.start_time is not None and quiz_session.end_time is None and not quiz_session.is_completed | |
@database_sync_to_async | |
def get_quiz_session(session_id: str) -> QuizSession: | |
return QuizSession.objects.get(id=session_id) | |
@database_sync_to_async | |
def save_quiz_session(quiz_session): | |
quiz_session.save() | |
@database_sync_to_async | |
def update_player_ready_status(participant_id, is_ready) -> bool: | |
try: | |
participant = QuizParticipant.objects.get(id=participant_id) | |
participant.is_ready = is_ready | |
participant.save() | |
logger.info(f"Updated ready status for participant {participant_id} to {is_ready}") | |
return True | |
except ObjectDoesNotExist: | |
logger.error(f"Participant with id {participant_id} not found") | |
return False | |
@database_sync_to_async | |
def check_players_ready(session_id, min_players=2) -> bool: | |
quiz_session = QuizSession.objects.get(id=session_id) | |
ready_players_count = QuizParticipant.objects.filter( | |
quiz_session=quiz_session, | |
is_ready=True, | |
is_connected=True | |
).count() | |
logger.info(f'Current players ready: {ready_players_count}') | |
return ready_players_count >= min_players | |
@database_sync_to_async | |
def get_participants(session_id: str) -> list[QuizParticipant]: | |
""" Get all participants in the current session. """ | |
quiz_session = QuizSession.objects.get(id=session_id) | |
return list(QuizParticipant.objects.filter(quiz_session=quiz_session)) | |
@database_sync_to_async | |
def get_correct_answer_id(question_id) -> str: | |
return str(Answer.objects.get(question_id=question_id, is_correct=True).answer_id) | |
@database_sync_to_async | |
def check_all_disconnected(session_id: str) -> bool: | |
quiz_session = QuizSession.objects.get(id=session_id) | |
connected_participants = QuizParticipant.objects.filter( | |
quiz_session=quiz_session, | |
is_connected=True | |
).count() | |
if connected_participants == 0: | |
quiz_session.is_completed = True | |
quiz_session.save() | |
return True | |
return False | |
@database_sync_to_async | |
def update_participant_connection_status(participant_id, is_connected): | |
QuizParticipant.objects.filter(id=participant_id).update(is_connected=is_connected) | |
@database_sync_to_async | |
def serialize_user(user) -> Dict: | |
return FirebaseUserSerializer(user).data | |
@database_sync_to_async | |
def serialize_participant(participant: QuizParticipant) -> Dict: | |
""" | |
Serialize a QuizParticipant instance into a dictionary. | |
Args: | |
participant (QuizParticipant): The participant to serialize. | |
Returns: | |
dict: The serialized data of the participant. | |
""" | |
return QuizParticipantSerializer(participant).data | |
@database_sync_to_async | |
def get_players_status(session_id: str): | |
quiz_session = QuizSession.objects.get(id=session_id) | |
participants = QuizParticipant.objects.filter(quiz_session=quiz_session) | |
return [ | |
{ | |
'id': str(p.user.id), | |
'user': FirebaseUserSerializer(p.user).data, | |
'is_host': p.is_host, | |
'is_ready': p.is_ready, | |
'score': p.score | |
} for p in participants | |
] | |
@database_sync_to_async | |
def get_total_scores(session_id: str, user, is_multiplayer: bool = False): | |
quiz_session = QuizSession.objects.get(id=session_id) | |
participants = QuizParticipant.objects.filter(quiz_session=quiz_session) | |
scores = {} | |
if is_multiplayer: | |
for participant in participants: | |
# Distinguish current user from opponent in multiplayer | |
if participant.user == user: | |
scores['your_score'] = participant.score | |
else: | |
scores['opponent_score'] = participant.score | |
else: | |
# In single player, just get the user's score | |
participant = participants.get(user=user) | |
scores['your_score'] = participant.score | |
return scores | |
@database_sync_to_async | |
def get_current_question(quiz_session, current_question_number): | |
question_session = QuizSessionQuestion.objects.select_related('question').filter( | |
quiz_session=quiz_session, | |
order=current_question_number | |
).first() | |
if question_session: | |
question = question_session.question | |
return { | |
'session_question_id': str(question_session.id), | |
'question_id': question.question_id, | |
'question_text': question.question_text, | |
'question_image_url': question.question_image_url, | |
'question_difficulty': question.get_question_difficulty_display(), | |
'answers': [{'answer_id': answer.answer_id, 'answer_text': answer.answer_text} for answer in | |
question.answers.all()], | |
'correct_answer_id': Answer.objects.get(question=question, is_correct=True).answer_id | |
} | |
return None | |
@database_sync_to_async | |
def get_total_questions(session_id: str) -> int: | |
quiz_session = QuizSession.objects.get(id=session_id) | |
return max(quiz_session.question_count, 1) | |
@database_sync_to_async | |
def check_if_multiplayer(session_id) -> bool: | |
try: | |
MultiplayerSession.objects.get(quiz_session_id=session_id) | |
return True | |
except ObjectDoesNotExist: | |
return False | |
@database_sync_to_async | |
def get_or_create_participant(session_id: str, user) -> Union[tuple[Any, Any], tuple[None, bool]]: | |
""" | |
Retrieve the participant if they exist. If not, create a new participant. | |
Args: | |
session_id (str): The ID of the quiz session. | |
user (User): The user for whom to get or create the participant. | |
Returns: | |
Tuple[Optional[QuizParticipant], bool]: A tuple containing the participant instance (or None if not found) | |
and a boolean indicating whether a new participant was created. | |
""" | |
try: | |
quiz_session = QuizSession.objects.get(id=session_id) | |
participant, created = QuizParticipant.objects.get_or_create( | |
quiz_session=quiz_session, | |
user=user, | |
defaults={'is_connected': True} | |
) | |
return participant, created # Return whether it's a new participant | |
except ObjectDoesNotExist: | |
logger.error(f"Quiz session with id {session_id} not found") | |
return None, False # Return None and False to indicate no participant and not created | |
@database_sync_to_async | |
def get_next_question(quiz_session, total_questions, current_question_number): | |
if current_question_number >= total_questions: | |
return None | |
answered_questions = UserAnswer.objects.filter( | |
quiz_participant__quiz_session=quiz_session | |
).values_list('quiz_session_question_id', flat=True) | |
return QuizSessionQuestion.objects.filter(quiz_session=quiz_session).exclude( | |
id__in=answered_questions).order_by('order').first() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import time | |
from datetime import datetime | |
from uuid import UUID | |
from channels.db import database_sync_to_async | |
from channels.generic.websocket import AsyncWebsocketConsumer | |
from django.core.exceptions import ObjectDoesNotExist | |
from django.db import transaction | |
from ..models import QuizParticipant, Answer, QuizSession, QuizSessionQuestion, UserAnswer, XPProfile, \ | |
AllTimeLeaderboardScore, MonthlyLeaderboardScore | |
from ..serializers import DetailedQuizSessionSerializer | |
from ..services import quiz_db_service | |
from ..services.quiz_timer_service import QuizTimerManager | |
logger = logging.getLogger(__name__) | |
class UUIDEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, UUID): | |
return str(obj) | |
return super().default(obj) | |
class QuizConsumer(AsyncWebsocketConsumer): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.room_group_name = None | |
self.session_id = None | |
self.question_start_time = None | |
self.timer_task = None | |
self.current_question_number = 0 | |
self.streak = 0 | |
self.current_stage = None | |
self.is_multiplayer = False | |
self.submitted_answers = {} | |
self.total_questions = 0 | |
self.timer_manager = QuizTimerManager() | |
async def connect(self): | |
self.session_id = self.scope['url_route']['kwargs']['session_id'] | |
self.room_group_name = f'quiz_{self.session_id}' | |
self.timer_manager.set_channel_info(self.channel_layer, self.room_group_name) | |
logger.info(f"Attempting connection for session {self.session_id}") | |
self.is_multiplayer = await quiz_db_service.check_if_multiplayer(self.session_id) | |
self.total_questions = await quiz_db_service.get_total_questions(self.session_id) | |
await self.channel_layer.group_add( | |
self.room_group_name, | |
self.channel_name | |
) | |
await self.accept() | |
logger.info(f"Connection accepted for session {self.session_id}") | |
# Get or create the participant | |
participant, created = await quiz_db_service.get_or_create_participant(self.session_id, user=self.scope['user']) | |
if created: | |
# New participant | |
logger.info("New connection, created participant") | |
if self.is_multiplayer: | |
serialized_participant = await quiz_db_service.serialize_participant(participant) | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'player_joined', | |
'player': serialized_participant | |
} | |
) | |
else: | |
# Existing participant, check if it was previously disconnected | |
if not participant.is_connected: | |
logger.info(f"Reconnection for participant {participant.id}") | |
await self.handle_reconnection(participant) | |
else: | |
logger.info(f"Participant {participant.id} is already connected") | |
# Set participant as connected | |
await quiz_db_service.update_participant_connection_status(participant.id, True) | |
# Send connection acknowledgment | |
await self.send(text_data=json.dumps({ | |
'type': 'connection_acknowledged', | |
'is_multiplayer': self.is_multiplayer | |
})) | |
# For single player games, automatically start the quiz | |
if not self.is_multiplayer: | |
logger.info("Single player game - automatically starting quiz") | |
await self.start_quiz() | |
async def handle_reconnection(self, participant): | |
await quiz_db_service.update_participant_connection_status(participant.id, True) | |
# Multiplayer: Notify others about the reconnection | |
if self.is_multiplayer: | |
serialized_participant = await quiz_db_service.serialize_participant(participant) | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'player_reconnected', | |
'player': serialized_participant | |
} | |
) | |
# Send the quiz state to the reconnected player (both single and multiplayer) | |
await self.send_quiz_state() | |
# Resume the quiz flow if the quiz is active (single or multiplayer) | |
quiz_session = await quiz_db_service.get_quiz_session(self.session_id) | |
if await quiz_db_service.is_quiz_active(quiz_session): | |
await self.resume_quiz_flow() | |
async def disconnect(self, close_code): | |
try: | |
logger.info(f"Disconnecting. Close code: {close_code}") | |
logger.info(f"Current stage: {self.current_stage}") | |
logger.info(f"Current question: {self.current_question_number}/{self.total_questions}") | |
await self.timer_manager.cancel_timer() | |
participant, _ = await quiz_db_service.get_or_create_participant(self.session_id, user=self.scope['user']) | |
if participant: | |
logger.info(f"Participant {participant.id} disconnecting") | |
await quiz_db_service.update_participant_connection_status(participant.id, False) | |
if self.is_multiplayer: | |
logger.info(f"Multiplayer disconnect for user {participant.user.username}") | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'user_disconnect', | |
'user_id': str(participant.user.id), | |
'username': participant.user.username, | |
'avatar': participant.user.avatar | |
} | |
) | |
if self.is_multiplayer: | |
all_disconnected = await quiz_db_service.check_all_disconnected(self.session_id) | |
logger.info(f"All participants disconnected: {all_disconnected}") | |
if all_disconnected: | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'quiz_abandoned', | |
'message': 'All participants disconnected. Quiz ended.' | |
} | |
) | |
await self.channel_layer.group_discard( | |
self.room_group_name, | |
self.channel_name | |
) | |
logger.info(f"Disconnected successfully from session {self.session_id}") | |
except Exception as e: | |
logger.error(f"Error during disconnect: {str(e)}", exc_info=True) | |
raise | |
async def receive(self, text_data): | |
text_data_json = json.loads(text_data) | |
message_type = text_data_json['type'] | |
logger.info(f"Received message of type: {message_type}") | |
if message_type == 'submit_answer': | |
logger.info(f"Received submit_answer message: {text_data_json}") | |
await self.handle_submit_answer(text_data_json) | |
elif message_type == 'player_ready': | |
await self.handle_player_ready() | |
else: | |
logger.warning(f"Unknown message type: {message_type}") | |
async def player_joined(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'player_joined', | |
'player': event['player'] | |
})) | |
async def player_left(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'player_left', | |
'player': event['player'] | |
})) | |
async def handle_submit_answer(self, data): | |
""" Handle player answer submission. """ | |
try: | |
if self.current_stage == 'answering': | |
participant, _ = await quiz_db_service.get_or_create_participant(self.session_id, self.scope['user']) | |
if not participant: | |
logger.error("Failed to get or create participant") | |
return | |
logger.info(f"Processing answer submission for participant {participant.id}") | |
await self.submit_answer(data['question_id'], data['answer_id']) | |
# Mark this participant as having submitted their answer | |
self.submitted_answers[participant.id] = True | |
logger.info(f"Player {participant.id} has submitted their answer.") | |
# Check if all players have submitted their answers | |
if await self.all_players_submitted(): | |
logger.info("All players have submitted, calling time_up") | |
await self.time_up() | |
else: | |
logger.warning(f"Received answer submission during {self.current_stage} stage. Ignoring.") | |
except Exception as e: | |
logger.error(f"Error in handle_submit_answer: {str(e)}", exc_info=True) | |
raise | |
async def all_players_submitted(self): | |
""" Check if all players in the game have submitted their answers. """ | |
try: | |
participants = await quiz_db_service.get_participants(self.session_id) | |
for participant in participants: | |
if participant.id not in self.submitted_answers or not self.submitted_answers[participant.id]: | |
return False | |
return True | |
except Exception as e: | |
logger.error(f"Error checking all players submitted: {str(e)}") | |
return False | |
async def all_players_ready(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'all_players_ready' | |
})) | |
# Ensure quiz only starts after all players are ready | |
if self.is_multiplayer: | |
self.current_stage = 'show_countdown' | |
# Use timer manager for countdown | |
await self.timer_manager.start_countdown_timer( | |
duration=3, | |
send_update=lambda remaining: self.send(text_data=json.dumps({ | |
'type': 'timer_update', | |
'stage': self.current_stage, | |
'duration': remaining | |
})), | |
on_complete=self.start_quiz | |
) | |
else: | |
await self.start_quiz() | |
async def start_quiz(self): | |
logger.info("Starting quiz.") | |
quiz_session = await quiz_db_service.get_quiz_session(self.session_id) | |
if quiz_session.is_completed: | |
await self.send(text_data=json.dumps({ | |
'type': 'error', | |
'message': 'This quiz has already been completed.' | |
})) | |
return | |
# Set the quiz as started in the database | |
quiz_session.is_started = True | |
await quiz_db_service.save_quiz_session(quiz_session) | |
# Send the initial quiz state | |
await self.send_quiz_state() | |
# Start the quiz by sending the first question | |
await self.send_next_question() | |
async def send_next_question(self): | |
try: | |
quiz_session = await quiz_db_service.get_quiz_session(self.session_id) | |
quiz_session_question = await quiz_db_service.get_next_question(quiz_session, self.total_questions, self.current_question_number) | |
if quiz_session_question and self.current_question_number < self.total_questions: | |
self.current_question_number += 1 | |
question_data = await quiz_db_service.get_current_question(quiz_session, self.current_question_number) | |
await self.send(text_data=json.dumps({ | |
'type': 'question', | |
'question_number': self.current_question_number, | |
'total_questions': self.total_questions, | |
'current_question': question_data, | |
'is_multiplayer': self.is_multiplayer | |
})) | |
await self.start_round_intro() | |
else: | |
logger.info("No more questions available or reached total questions, sending quiz end.") | |
await self.send_quiz_end() | |
except Exception as e: | |
logger.error(f"Error in send_next_question: {str(e)}") | |
await self.send(text_data=json.dumps({ | |
'type': 'error', | |
'message': 'An error occurred while fetching the next question.' | |
})) | |
async def handle_player_ready(self): | |
logger.info("Handle player ready.") | |
if self.is_multiplayer: | |
participant, created = await quiz_db_service.get_or_create_participant(self.session_id, user=self.scope['user']) | |
if participant: | |
success = await quiz_db_service.update_player_ready_status(participant.id, True) | |
serialized_participant = await quiz_db_service.serialize_participant(participant) | |
if success: | |
# Broadcast the player_ready event to the entire room, not just the sending client | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'broadcast_player_ready', | |
'player': serialized_participant | |
} | |
) | |
logger.info("Player ready.") | |
# Check if enough players are ready | |
enough_players_ready = await quiz_db_service.check_players_ready(self.session_id) | |
if enough_players_ready: | |
logger.info("Enough players ready.") | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'all_players_ready' | |
} | |
) | |
else: | |
logger.info("Waiting for more players to be ready.") | |
else: | |
logger.info("Could not update player ready status.") | |
await self.send(text_data=json.dumps({ | |
'type': 'error', | |
'message': 'Failed to update ready status' | |
})) | |
else: | |
logger.info("Player not found.") | |
await self.send(text_data=json.dumps({ | |
'type': 'error', | |
'message': 'Participant not found' | |
})) | |
else: | |
await self.start_quiz() | |
async def player_ready(self, event): | |
# Broadcast the player_ready event to all clients in the group | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'broadcast_player_ready', | |
'player': event['player'] | |
} | |
) | |
async def broadcast_player_ready(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'player_ready', | |
'player': event['player'] | |
})) | |
async def start_round_intro(self): | |
self.current_stage = 'show_round' | |
await self.send_timer_update('show_round', 1.5) | |
await self.timer_manager.start_timer(1.5, self.show_question) | |
async def show_question(self): | |
self.current_stage = 'show_question' | |
await self.send_timer_update('show_question', 1.5) | |
await self.timer_manager.start_timer(1.5, self.start_answering) | |
async def start_answering(self): | |
""" Start answering timer and wait for answers from all participants. """ | |
await self.timer_manager.cancel_timer() | |
self.current_stage = 'answering' | |
self.question_start_time = time.time() | |
self.submitted_answers = {} | |
await self.timer_manager.start_answering_timer( | |
duration=10, | |
time_up_callback=self.time_up, | |
all_submitted_check=self.all_players_submitted | |
) | |
async def broadcast_timer_update(self, event): | |
"""Handler for broadcast timer updates""" | |
await self.send_timer_update(event['stage'], event['duration']) | |
async def handle_timer_update(self, event): | |
"""Handle timer updates received through the channel layer""" | |
await self.send_timer_update(event['stage'], event['duration']) | |
async def time_up(self): | |
"""Proceed to the next step once all answers are in or time is up.""" | |
logger.info("Time up called") | |
# Cancel any existing timer | |
await self.timer_manager.cancel_timer() | |
# Notify all participants of the end of the round | |
await self.broadcast_question_completed() | |
# Proceed to showing the correct answer | |
await self.show_correct_answer() | |
async def send_timer_update(self, stage, duration): | |
await self.send(text_data=json.dumps({ | |
'type': 'timer_update', | |
'stage': stage, | |
'duration': duration | |
})) | |
async def broadcast_question_completed(self): | |
""" Notify all participants that the question is completed. """ | |
scores = await quiz_db_service.get_total_scores(self.session_id, self.scope['user'], is_multiplayer=self.is_multiplayer) | |
# Different response structure for single vs multiplayer | |
response_data = { | |
'type': 'question_completed', | |
'question_id': self.current_question_number, | |
} | |
if self.is_multiplayer: | |
response_data['scores'] = { | |
'your_score': scores['your_score'], | |
'opponent_score': scores['opponent_score'] | |
} | |
else: | |
response_data['scores'] = { | |
'your_score': scores['your_score'] | |
} | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'broadcast_answer', | |
'data': response_data | |
} | |
) | |
async def broadcast_answer(self, event): | |
""" Broadcast the question completed status to all participants. """ | |
await self.send(text_data=json.dumps(event['data'])) | |
async def submit_answer(self, question_id, answer_id): | |
await self.timer_manager.cancel_timer() | |
if self.question_start_time: | |
time_taken = time.time() - self.question_start_time | |
else: | |
time_taken = 10 | |
is_correct, score = await self.save_user_answer(question_id, answer_id, time_taken) | |
scores = await quiz_db_service.get_total_scores(self.session_id, self.scope['user'], is_multiplayer=self.is_multiplayer) | |
# Different response structure for single vs multiplayer | |
response_data = { | |
'type': 'question_completed', | |
'question_id': question_id, | |
} | |
if self.is_multiplayer: | |
response_data['scores'] = { | |
'your_score': scores['your_score'], | |
'opponent_score': scores['opponent_score'] | |
} | |
else: | |
response_data['scores'] = { | |
'your_score': scores['your_score'] | |
} | |
if self.is_multiplayer: | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'broadcast_answer', | |
'data': response_data | |
} | |
) | |
else: | |
await self.send(text_data=json.dumps(response_data)) | |
await self.show_correct_answer() | |
async def show_correct_answer(self): | |
self.current_stage = 'show_correct_answer' | |
await self.send_timer_update('show_correct_answer', 3) | |
await self.timer_manager.cancel_timer() | |
if self.current_question_number < self.total_questions: | |
await self.timer_manager.start_timer(3, self.send_next_question) | |
else: | |
await self.timer_manager.start_timer(3, self.send_quiz_end) | |
async def send_quiz_end(self): | |
results = await self.end_quiz() | |
# This broadcasts to all consumers in the group, not to clients directly | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'quiz_ended', | |
'results': results | |
} | |
) | |
async def quiz_ended(self, event): | |
# This sends the actual WebSocket message to the client | |
# This is the only message the client receives | |
await self.send(text_data=json.dumps({ | |
'type': 'quiz_ended', | |
'results': event['results'] | |
})) | |
async def resume_quiz_flow(self): | |
if self.current_stage == 'show_round': | |
await self.start_round_intro() | |
elif self.current_stage == 'show_question': | |
await self.show_question() | |
elif self.current_stage == 'answering': | |
await self.start_answering() | |
elif self.current_stage == 'show_correct_answer': | |
await self.show_correct_answer() | |
else: | |
logger.warning(f"Unknown stage {self.current_stage}, starting from the beginning") | |
await self.start_round_intro() | |
@database_sync_to_async | |
def save_user_answer(self, question_id, answer_id, time_taken): | |
try: | |
quiz_session = QuizSession.objects.get(id=self.session_id) | |
quiz_session_question = QuizSessionQuestion.objects.get(quiz_session=quiz_session, question_id=question_id) | |
participant = QuizParticipant.objects.get(quiz_session=quiz_session, user=self.scope["user"]) | |
if answer_id: | |
answer = Answer.objects.get(answer_id=answer_id) | |
is_correct = answer.is_correct | |
else: | |
is_correct = False | |
# Calculate score based on the difficulty and remaining time | |
if quiz_session.difficulty == 'easy': | |
base_score = max(1, 20 - time_taken) | |
elif quiz_session.difficulty == 'medium': | |
base_score = max(5, 30 - time_taken) | |
elif quiz_session.difficulty == 'hard': | |
base_score = max(10, 40 - time_taken) | |
else: | |
base_score = max(5, 30 - time_taken) | |
# Apply streak and bonuses | |
if self.current_question_number == self.total_questions: | |
base_score *= 2 | |
base_score += 20 - time_taken | |
if is_correct: | |
self.streak += 1 | |
if self.streak == 3: | |
base_score += 5 | |
elif self.streak == 5: | |
base_score += 10 | |
elif self.streak >= 10: | |
base_score += 20 | |
else: | |
self.streak = 0 | |
# Save the answer and update the score | |
UserAnswer.objects.create( | |
quiz_participant=participant, | |
quiz_session_question=quiz_session_question, | |
answer_id=answer_id if answer_id is not None else None, | |
is_correct=is_correct, | |
time_taken=time_taken, | |
score=base_score | |
) | |
participant.score += base_score | |
participant.save() | |
return is_correct, base_score | |
except ObjectDoesNotExist as e: | |
logger.error(f"Error in save_user_answer: {str(e)}") | |
return None, 0 | |
@database_sync_to_async | |
def end_quiz(self): | |
quiz_session = QuizSession.objects.get(id=self.session_id) | |
participants = QuizParticipant.objects.filter(quiz_session=quiz_session) | |
logger.info("End Quiz triggered") | |
highest_score = 0 | |
winners = [] | |
# Calculate the highest score and identify winners | |
for participant in participants: | |
correct_answers = UserAnswer.objects.filter( | |
quiz_participant=participant, is_correct=True | |
).count() | |
participant.correct_answers = correct_answers | |
if participant.score > highest_score: | |
highest_score = participant.score | |
winners = [participant] | |
elif participant.score == highest_score: | |
winners.append(participant) | |
# Apply victory bonus to winners and save participants | |
victory_bonus = 50 | |
for winner in winners: | |
winner.score += victory_bonus | |
winner.save() | |
# Mark quiz as completed | |
quiz_session.is_completed = True | |
quiz_session.save() | |
# Add XP for each participant and handle level-up logic | |
for participant in participants: | |
# Calculate XP based on the quiz score or performance criteria | |
base_xp = participant.score | |
if participant in winners: | |
base_xp += victory_bonus # Additional XP for winners | |
profile, created = XPProfile.objects.get_or_create(user=participant.user) | |
profile.add_xp(base_xp) # This will handle level-up if needed | |
# Update Leaderboard with points | |
self.update_leaderboards(participant.user, quiz_session.category, base_xp) | |
# Serialize and return the session results | |
detailed_serializer = DetailedQuizSessionSerializer(quiz_session) | |
result_data = detailed_serializer.data | |
return result_data | |
async def send_quiz_state(self): | |
quiz_session = await quiz_db_service.get_quiz_session(self.session_id) | |
current_question = await quiz_db_service.get_current_question(quiz_session, self.current_question_number) | |
if self.total_questions == 0: | |
self.total_questions = await quiz_db_service.get_total_questions(self.session_id) | |
state_data = { | |
'type': 'quiz_state', | |
'question_number': self.current_question_number, | |
'total_questions': self.total_questions, | |
'current_stage': self.current_stage, | |
'is_multiplayer': self.is_multiplayer | |
} | |
if current_question: | |
state_data['current_question'] = current_question | |
else: | |
state_data['message'] = 'No current question available.' | |
if self.is_multiplayer: | |
state_data['players'] = await quiz_db_service.get_players_status(self.session_id) | |
await self.send(text_data=json.dumps(state_data)) | |
async def user_disconnect(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'user_disconnected', | |
'user_id': event['user_id'], | |
'username': event['username'], | |
'avatar': event['avatar'] | |
})) | |
async def user_reconnected(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'user_reconnected', | |
'user_id': event['user_id'], | |
'username': event['username'], | |
'avatar': event['avatar'] | |
})) | |
async def quiz_abandoned(self, event): | |
await self.send(text_data=json.dumps({ | |
'type': 'quiz_abandoned', | |
'message': event['message'] | |
})) | |
def update_leaderboards(self, user, category, points): | |
current_date = datetime.now() | |
year = current_date.year | |
month = current_date.month | |
with transaction.atomic(): | |
# Update MonthlyLeaderboardScore | |
monthly_score, created = MonthlyLeaderboardScore.objects.get_or_create( | |
user=user, category=category, year=year, month=month, | |
defaults={'score': 0} | |
) | |
monthly_score.score += points | |
monthly_score.save() | |
# Update AllTimeLeaderboardScore | |
all_time_score, created = AllTimeLeaderboardScore.objects.get_or_create( | |
user=user, category=category, | |
defaults={'score': 0} | |
) | |
all_time_score.score += points | |
all_time_score.save() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from typing import Optional, Callable | |
logger = logging.getLogger(__name__) | |
class QuizTimerManager: | |
def __init__(self, channel_layer=None, room_group_name=None): | |
self.timer_task: Optional[asyncio.Task] = None | |
self.channel_layer = channel_layer | |
self.room_group_name = room_group_name | |
self.lock = asyncio.Lock() | |
def set_channel_info(self, channel_layer, room_group_name): | |
"""Update channel layer information""" | |
self.channel_layer = channel_layer | |
self.room_group_name = room_group_name | |
async def start_timer(self, duration: float, next_stage: Callable) -> None: | |
"""Start a basic timer that executes the next_stage callback after duration""" | |
async with self.lock: | |
await self.cancel_timer() | |
self.timer_task = asyncio.create_task(self._timer(duration, next_stage)) | |
async def start_countdown_timer(self, duration: int, send_update: Callable, on_complete: Callable) -> None: | |
"""Start a countdown timer that sends updates each second | |
Args: | |
duration: Time in seconds for countdown | |
send_update: Callback to send updates to client | |
on_complete: Callback to execute when countdown finishes | |
""" | |
await self.cancel_timer() | |
self.timer_task = asyncio.create_task(self._countdown_timer(duration, send_update, on_complete)) | |
async def _countdown_timer(self, duration: int, send_update: Callable, on_complete: Callable) -> None: | |
"""Internal countdown timer implementation with updates each second""" | |
try: | |
for i in range(duration, 0, -1): | |
await send_update(i) | |
await asyncio.sleep(1) | |
await on_complete() | |
except asyncio.CancelledError: | |
logger.info("Countdown timer cancelled") | |
raise | |
except Exception as e: | |
logger.error(f"Error in countdown timer: {str(e)}", exc_info=True) | |
raise | |
async def start_answering_timer(self, duration: int, time_up_callback: Callable, | |
all_submitted_check: Callable) -> None: | |
""" | |
Start the answering phase timer with periodic updates | |
Args: | |
duration: Time in seconds for answering | |
time_up_callback: Callback to execute when time is up | |
all_submitted_check: Function to check if all players submitted | |
""" | |
await self.cancel_timer() | |
self.timer_task = asyncio.create_task( | |
self._answering_timer(duration, time_up_callback, all_submitted_check) | |
) | |
async def _timer(self, duration: float, next_stage: Callable) -> None: | |
"""Internal timer implementation""" | |
try: | |
await asyncio.sleep(duration) | |
await next_stage() | |
except asyncio.CancelledError: | |
logger.info("Timer cancelled") | |
raise | |
except Exception as e: | |
logger.error(f"Error in timer: {str(e)}", exc_info=True) | |
raise | |
async def _answering_timer(self, duration: int, time_up_callback: Callable, | |
all_submitted_check: Callable) -> None: | |
"""Internal answering timer implementation with periodic updates""" | |
try: | |
remaining_time = duration | |
logger.info(f"Starting answering timer for {duration} seconds") | |
while remaining_time > 0: | |
# Send timer update to all participants | |
if self.channel_layer and self.room_group_name: | |
await self.channel_layer.group_send( | |
self.room_group_name, | |
{ | |
'type': 'broadcast_timer_update', | |
'stage': 'answering', | |
'duration': remaining_time | |
} | |
) | |
# Check if all players submitted their answers | |
if await all_submitted_check(): | |
logger.info("All players submitted their answers") | |
break | |
await asyncio.sleep(1) | |
remaining_time -= 1 | |
# Call time_up callback when timer ends | |
await time_up_callback() | |
except asyncio.CancelledError: | |
logger.info("Answering timer cancelled") | |
raise | |
except Exception as e: | |
logger.error(f"Error in answering timer: {str(e)}", exc_info=True) | |
raise | |
async def cancel_timer(self) -> None: | |
"""Cancel any running timer""" | |
try: | |
if self.timer_task and not self.timer_task.done(): | |
logger.info("Cancelling timer task") | |
self.timer_task.cancel() | |
try: | |
await self.timer_task | |
except asyncio.CancelledError: | |
logger.info("Timer task cancelled successfully") | |
except Exception as e: | |
logger.error(f"Error while awaiting cancelled timer: {str(e)}", exc_info=True) | |
self.timer_task = None | |
except Exception as e: | |
logger.error(f"Error in cancel_timer: {str(e)}", exc_info=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment