Last active
September 11, 2019 11:12
-
-
Save firehawk895/84a97bfec8bad7cc8db8f8d34975dac1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from networkx.readwrite import json_graph | |
from rest_framework.exceptions import ValidationError | |
from rest_framework.fields import IntegerField | |
from utility.generics import get_lazy_dict_from_queryset | |
def get_merged_rubric_tree(question, required_keys, evaluation=None): | |
""" | |
combines question.rubrics and injects other meta data for that evaluation in the tree which can be used | |
in GET evaluation details | |
:param question: question model object | |
:param required_keys: list of required keys - subset of ["name", "rubric", "max_marks", "grading_guidelines", "score", | |
"comment", "correctness"] | |
:param evaluation: corresponding evaluation of the student, if null no student related field will be sent | |
:return: | |
""" | |
rubric_tree_root = question.rubrics | |
max_marks = question.max_marks[0] if len(question.max_marks) > 0 else None | |
rins = question.get_rubric_information_nodes() | |
rins_dict = dict(get_lazy_dict_from_queryset(rins)) | |
if evaluation: | |
question_session = evaluation.question_session | |
score = question_session.score | |
rubric_scores = evaluation.rubric_scores.all() | |
rubric_scores_dict = dict(get_lazy_dict_from_queryset(rubric_scores, "rubric_node_information_id")) | |
def merge_rubric_tree(children_list): | |
for node in children_list: | |
id = node["id"] | |
rin_obj = rins_dict.get(id) | |
rubric_score_obj = rubric_scores_dict.get(id) if evaluation else {} | |
attribute_map = { | |
"name": getattr(rin_obj, 'name', None), | |
"rubric": getattr(rin_obj, 'name', None), | |
"max_marks": getattr(rin_obj, 'max_marks', None), | |
"grading_guidelines": getattr(rin_obj, 'grading_guideline', None), | |
"grading_guideline": getattr(rin_obj, 'grading_guideline', None), | |
"score": getattr(rubric_score_obj, 'score', None), | |
"comment": getattr(rubric_score_obj, 'comment', None), | |
"comments": getattr(rubric_score_obj, 'comment', None), | |
"correctness": getattr(rubric_score_obj, 'correctness', None), | |
} | |
for key in required_keys: | |
node[key] = attribute_map[key] | |
# recurse on children | |
if "children" in node: | |
merge_rubric_tree(node["children"]) | |
if "children" in rubric_tree_root: | |
merge_rubric_tree(rubric_tree_root.get('children')) | |
rubric_tree_root["max_marks"] = max_marks | |
if evaluation: rubric_tree_root["score"] = score | |
return rubric_tree_root | |
def process_grades_for_student(tree): | |
""" | |
purpose: hide data from students side rubrics | |
:param tree: get_merged_rubric_tree tree | |
:return: | |
""" | |
# Removes level 3 | |
# Removes max marks and scores from level 2 | |
def remove_keys_based_on_level(tree, level): | |
for child in tree['children']: | |
if level == 2: | |
child.pop('max_marks') | |
child.pop('score') | |
if 'children' in child : child.pop('children') | |
if level == 1: | |
child.pop('correctness') | |
if "children" in child: | |
remove_keys_based_on_level(child, level + 1) | |
level = 1 | |
remove_keys_based_on_level(tree, level) | |
return tree | |
def get_netx_obj(rubric_tree): | |
""" | |
given a json tree (rather dict) of a DiGraph | |
typically the output of get_merged_rubric_tree | |
:param rubric_tree: | |
:return: | |
""" | |
netx_obj = json_graph.tree_graph(rubric_tree) | |
return netx_obj | |
def get_leaf_nodes(netx_obj): | |
""" | |
given any networkx DiGraph object, returns the leaf node ids | |
:param netx_obj: | |
:return: | |
""" | |
# WARNING : Never pass True to netx_obj.nodes_iter(), it bombs with | |
# networkx.exception.NetworkXError: Node {'max_marks': 1000} in the sequence nbunch is not a valid node. | |
return [node for node in netx_obj.nodes_iter() if netx_obj.out_degree(node) == 0] | |
def validate_rubric_score(node, max_marks): | |
""" | |
Raises validation error unless a node's score is valid and not greater than the max_marks | |
:param node: | |
:param max_marks: | |
:return: | |
""" | |
# support None in the input | |
if node["score"]: | |
try: | |
integer_field = IntegerField(min_value=0, max_value=int(max_marks)) | |
sanitized_internal_value = integer_field.to_internal_value(node["score"]) | |
integer_field.run_validators(sanitized_internal_value) | |
except ValidationError as e: | |
raise ValidationError({ | |
str(node["id"]): e.detail | |
}) | |
# TODO - restrict max length of a comment as well | |
def validate_comment(node, level): | |
""" | |
Raises validation error if a comment is detected at level 3 | |
:param node: | |
:param level: level of the current node (depth of tree) | |
:return: | |
""" | |
# level = depth -1 | |
if level == 3 and node["comment"]: | |
raise ValidationError({ | |
str(node["id"]): "Comment not expected at this level" | |
}) | |
pass | |
def sum_rubric_data_marks(rubric_data, rubric_info_nodes_queryset): | |
""" | |
Depth first traverses rubric tree (json) and calculates sum of every non-leaf node from leaf nodes. | |
:param rubric_data: input rubric data from question | |
:param rubric_info_nodes_queryset: | |
:return: | |
""" | |
rubric_info_nodes_dict = dict(get_lazy_dict_from_queryset(rubric_info_nodes_queryset)) | |
def validate_and_sum_tree(rubric_tree_root): | |
node_obj = rubric_info_nodes_dict[rubric_tree_root["id"]] | |
if ("children" in rubric_tree_root) and len(rubric_tree_root.get('children')): | |
total = 0 | |
for child in rubric_tree_root['children']: | |
sub_tree_total = validate_and_sum_tree(child) | |
if (type(total) is int) and sub_tree_total: | |
total += sub_tree_total | |
else: | |
total = None | |
node_obj.max_marks = total | |
node_obj.save() | |
return total | |
else: | |
return node_obj.max_marks | |
total = 0 | |
for child in rubric_data.get('children', []): | |
total += validate_and_sum_tree(child) or 0 | |
return total | |
def validate_and_sum_input_rubric_data(rubric_data, rubric_info_nodes_queryset): | |
""" | |
Depth first traverses rubric tree (json) and calculates sum of every non-leaf node from leaf nodes. | |
Does any validations if needed. injects the data into the rubric_tree_root | |
:param rubric_data: input rubric data typically from the POST evaluations API | |
:param rubric_info_nodes_queryset: | |
:return: | |
""" | |
""" | |
rubric_data structure: {'id': '0', 'children': [{'comment': 'new comment on rubric-1', 'children': [{'comment': None, 'children': [{'comment': None, 'score': 22, 'id': 53}], 'id': 52}, {'comment': 'new comment on rubric-4', 'children': [{'comment': None, 'score': 30, 'id': 55}], 'id': 54}], 'id': 51}, {'comment': None, 'score': 14, 'id': 56}]} | |
""" | |
rubric_info_nodes_dict = dict(get_lazy_dict_from_queryset(rubric_info_nodes_queryset)) | |
def validate_and_sum_tree(rubric_tree_root, level=0): | |
""" | |
:param rubric_tree_root: | |
:param level: level of current node being traversed | |
:return: | |
""" | |
if "children" in rubric_tree_root: | |
total = 0 | |
for sub_node in rubric_tree_root["children"]: | |
total += validate_and_sum_tree(sub_node, level+1) | |
rubric_tree_root["score"] = total | |
return total | |
else: | |
# this is a leaf node, need to validate only this | |
max_marks = rubric_info_nodes_dict[rubric_tree_root["id"]].max_marks | |
validate_rubric_score(rubric_tree_root, max_marks) | |
validate_comment(rubric_tree_root, level) | |
score = rubric_tree_root.get("score") | |
return score if score else 0 | |
validate_and_sum_tree(rubric_data) | |
""" | |
return structure {'score': 66, 'id': '0', 'children': [{'comment': 'new comment on rubric-1', 'score': 52, 'children': [{'comment': None, 'score': 22, 'children': [{'comment': None, 'score': 22, 'id': 53}], 'id': 52}, {'comment': 'new comment on rubric-4', 'score': 30, 'children': [{'comment': None, 'score': 30, 'id': 55}], 'id': 54}], 'id': 51}, {'comment': None, 'score': 14, 'id': 56}]} | |
""" | |
return rubric_data | |
def persist_rubric_tree(rubric_data, evaluation): | |
""" | |
persists the rubric tree (rubric_data) into the corresponding RubricScore objects | |
:param rubric_data: rubric tree with score and comment meta data | |
:param evaluation: the student's corresponding evaluation | |
:return: | |
""" | |
from .models import RubricScore | |
def persist_children(rubric_tree_children): | |
for node in rubric_tree_children: | |
if "children" in node: | |
persist_children(node["children"]) | |
# TODO: not populating is_leaf_score of RubricScore, do we need it? | |
rubric_score, created = RubricScore.objects.get_or_create( | |
rubric_node_information_id=node["id"], | |
evaluation_id=evaluation.id) | |
rubric_score.comment = node["comment"] if node.get("comment") else None | |
rubric_score.score = node["score"] | |
rubric_score.save() | |
persist_children(rubric_data["children"]) | |
# persist(rubric_data["children"]) | |
evaluation.question_session.update_score(rubric_data["score"]) | |
# TODO : abstract it out to a Clonable model mixin or something | |
def clone_rubric(rubric_tree): | |
""" | |
Used by the cloning APIs for central (Course cloning) | |
:param rubric_tree: | |
:return: | |
""" | |
from .models import RubricNodeInformation | |
new_tree = {} | |
if rubric_tree: | |
old_id = rubric_tree.get('id') | |
if old_id == 0: | |
new_id = 0 | |
else: | |
new_info = RubricNodeInformation.objects.get(pk=old_id).clone() | |
new_id = new_info.id | |
new_tree['id'] = new_id | |
children = [] | |
if rubric_tree.get('children'): | |
for child in rubric_tree.get('children'): | |
children.append(clone_rubric(child)) | |
new_tree['children'] = children | |
return new_tree |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment