Last active
October 17, 2021 19:50
-
-
Save septatrix/e9f8f967c13765863602469c725766c4 to your computer and use it in GitHub Desktop.
Moodle group import
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.venv | |
RWTHonline.csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import configparser | |
import csv | |
import logging | |
import sys | |
from collections import defaultdict | |
from itertools import groupby | |
from pathlib import Path | |
from pprint import pformat | |
from typing import Dict, Iterable, List, NamedTuple, Sequence, TypedDict, TypeVar | |
from moodle.session import MoodleSession | |
logging.basicConfig(level=logging.INFO) | |
MIN_GROUP_SIZE = 4 | |
MAX_GROUP_SIZE = 4 | |
PREFERRED_GROUP_SIZE = 4 | |
class User(NamedTuple): | |
fullname: str | |
group: str | |
matriculation_number: str | |
moodle_id: int | |
class Config(TypedDict): | |
wwwroot: str | |
token: str | |
courseid: str | |
S = TypeVar("S", bound=Sequence) | |
def chunks(sequence: S, n: int) -> Iterable[S]: | |
"""Yield successive n-sized chunks from sequence.""" | |
for i in range(0, len(sequence), n): | |
yield sequence[i : i + n] # type: ignore | |
def load_config() -> Config: | |
""" | |
Lädt Moodle-URL, Token und Kursid aus sites.conf (im INI Format) | |
Die Datei sollte z.B. wie folgt aussehen: | |
[Moodle] | |
wwwroot = https://moodle.rwth-aachen.de | |
token = <Token mit den passenden Berechtigungen> | |
courseid = 13820 | |
""" | |
config = configparser.ConfigParser() | |
if not config.read("sites.conf"): | |
logging.critical("Could not find sites.conf, aborting!") | |
sys.exit(1) | |
try: | |
[sectionname] = config.sections() | |
section = config[sectionname] | |
except ValueError: | |
logging.critical("More than one section in sites.conf, aborting!") | |
sys.exit(1) | |
try: | |
return Config( | |
wwwroot=section["wwwroot"], | |
token=section["token"], | |
courseid=section["courseid"], | |
) | |
except configparser.NoOptionError as e: | |
logging.critical(f"Option {e.option} not found in {e.section}, aborting!") | |
sys.exit(1) | |
def load_users( | |
moodle_session: MoodleSession, rwth_export: Path, courseid: str | |
) -> List[User]: | |
# Set encoding and delimiter to work around excel shenanigans | |
with rwth_export.open(newline="", encoding="utf_8_sig") as f: | |
reader = csv.DictReader(f, delimiter=";") | |
# .strip() to remove non-breaking space | |
rwth_users = {user["MATRIKELNUMMER"].strip(): user for user in reader} | |
logging.info(f"Loaded {len(rwth_users)} user(s) from RWTHonline") | |
moodle_user_ids = [ | |
user["id"] # FIXME revert to use ID | |
for user in moodle_session.webservice( | |
"core_enrol_get_enrolled_users", | |
data={"courseid": courseid}, | |
) | |
] | |
moodle_users = { | |
user["idnumber"]: user | |
for user in moodle_session.webservice( | |
"core_user_get_users_by_field", | |
data={ | |
"field": "id", # FIXME revert to use ID | |
"values": moodle_user_ids, | |
}, | |
) | |
} | |
# moodle_users = { | |
# user["idnumber"]: user | |
# for user in moodle_session.webservice( | |
# "core_user_get_course_user_profiles", | |
# data={ | |
# "userlist": [ | |
# {"userid": user["id"], "courseid": courseid} | |
# for user in moodle_users.values() | |
# ] | |
# }, | |
# ) | |
# } | |
logging.info(f"Loaded {len(moodle_users)} user(s) from Moodle") | |
rwth_only = set(rwth_users).difference(moodle_users) | |
if rwth_only: | |
logging.warning(f"Found {len(rwth_only)} RWTHonline user(s) not in Moodle") | |
logging.warning( | |
pformat( | |
[ | |
f"Matrikelnummer {matrikelnummer} - {user['NACHNAME']}, {user['VORNAME']}" | |
for matrikelnummer, user in rwth_users.items() | |
if matrikelnummer in rwth_only | |
] | |
) | |
) | |
moodle_only = set(moodle_users).difference(rwth_users) | |
if moodle_only: | |
logging.warning(f"Found {len(moodle_only)} Moodle user(s) not in RWTHonline") | |
logging.warning( | |
pformat( | |
[ | |
f"Matrikelnummer {matrikelnummer} - {user['fullname']}" | |
for matrikelnummer, user in moodle_users.items() | |
if matrikelnummer in moodle_only | |
] | |
) | |
) | |
return [ | |
User( | |
fullname=moodle_users[matrikelnummer]["fullname"], | |
matriculation_number=moodle_users[matrikelnummer]["idnumber"], | |
moodle_id=moodle_users[matrikelnummer]["id"], | |
group=rwth_users[matrikelnummer]["TEAMNAME"], | |
) | |
for matrikelnummer in set(rwth_users).intersection(moodle_users) | |
] | |
def assign_groups(users: List[User]) -> Dict[str, List[User]]: | |
users_sorted = sorted( | |
users, | |
key=lambda user: user.group, | |
) | |
custom_groups = { | |
key: list(group) | |
for key, group in groupby(users_sorted, key=lambda user: user.group) | |
} | |
valid_groups = [] | |
invalid_groups = [] | |
for group_name, members in custom_groups.items(): | |
if not MIN_GROUP_SIZE <= len(members) <= MAX_GROUP_SIZE: | |
logging.info( | |
f"Size of group '{group_name}' not within bounds" | |
f" ({MIN_GROUP_SIZE} <= {len(members)} <= {MAX_GROUP_SIZE})" | |
) | |
invalid_groups.append(members) | |
continue | |
valid_groups.append(members) | |
# invalid_groups_deque = deque(sorted(invalid_groups, key=len)) | |
# while invalid_groups_deque: | |
# small_group = invalid_groups_deque.popleft() | |
# big_group = invalid_groups_deque.pop() | |
# | |
# while not MIN_GROUP_SIZE <= len(small_group + big_group) <= MAX_GROUP_SIZE: | |
# # discard big group | |
# unassigned_users.extend(big_group) | |
# big_group = invalid_groups_deque.pop() | |
# | |
# valid_groups.append(small_group + big_group) | |
unassigned_users = [] | |
invalid_groups_iter = iter(sorted(invalid_groups, key=len)) | |
for group in invalid_groups_iter: | |
other_group: List[User] = next(invalid_groups_iter, []) | |
joined_group = group + other_group | |
if not MIN_GROUP_SIZE <= len(joined_group) <= MAX_GROUP_SIZE: | |
unassigned_users.extend(joined_group) | |
else: | |
valid_groups.append(joined_group) | |
for group in chunks(unassigned_users, PREFERRED_GROUP_SIZE): | |
valid_groups.append(group) | |
return {f"Team {i}": group for i, group in enumerate(valid_groups, start=1)} | |
def export_groups(groups: Dict[str, List[User]], csv_output: Path) -> None: | |
with csv_output.open("w", newline="", encoding="utf8") as f: | |
writer = csv.writer(f) | |
for name, members in groups.items(): | |
for member in members: | |
writer.writerow((name, *member)) | |
def assign_groups_cli(rwth_export: Path, csv_output: Path) -> None: | |
config = load_config() | |
moodle_session = MoodleSession(config["wwwroot"], config["token"]) | |
users = load_users(moodle_session, rwth_export, config["courseid"]) | |
# Map custom group names to Group 1-99 | |
groups = assign_groups(users) | |
# Export CSV | |
export_groups(groups, csv_output) | |
def import_groups(csv_input: Path) -> Dict[str, List[User]]: | |
groups = defaultdict(list) | |
with csv_input.open(newline="", encoding="utf8") as f: | |
reader = csv.reader(f) | |
for row in reader: | |
moodle_group, fullname, group, matriculation_number, moodle_id = row | |
groups[moodle_group].append( | |
User(fullname, group, matriculation_number, int(moodle_id)) | |
) | |
return groups | |
def upload_groups( | |
moodle_session: MoodleSession, groups: Dict[str, List[User]], courseid: str | |
) -> None: | |
existing_groups: Dict[str, int] = { | |
group["name"]: group["id"] | |
for group in moodle_session.webservice( | |
"core_group_get_course_groups", data={"courseid": courseid} | |
) | |
} | |
# Create missing groups | |
missing_groups = set(groups).difference(existing_groups) | |
if missing_groups: | |
created_groups: Dict[str, int] = { | |
group["name"]: group["id"] | |
for group in moodle_session.webservice( | |
"core_group_create_groups", | |
data={ | |
"groups": [ | |
{ | |
"courseid": courseid, | |
"name": name, | |
"description": "", | |
} | |
for name in missing_groups | |
] | |
}, | |
) | |
} | |
existing_groups.update(created_groups) | |
# Assign group membership | |
moodle_session.webservice( | |
"core_group_add_group_members", | |
data={ | |
"members": [ | |
{"groupid": existing_groups[groupname], "userid": user.moodle_id} | |
for groupname, members in groups.items() | |
for user in members | |
] | |
}, | |
) | |
def upload_groups_cli(csv_input: Path) -> None: | |
# Load CSV | |
groups = import_groups(csv_input) | |
config = load_config() | |
moodle_session = MoodleSession(config["wwwroot"], config["token"]) | |
upload_groups(moodle_session, groups, config["courseid"]) | |
def entrypoint() -> None: | |
parser = argparse.ArgumentParser( | |
description="Assign groups based on preferences and upload them to Moodel", | |
allow_abbrev=False, | |
) | |
subparser = parser.add_subparsers(dest="command") | |
assignementcli = subparser.add_parser( | |
"assign-groups", help="Use the RWTHonline export to assign users into groups" | |
) | |
assignementcli.add_argument("--output", default=Path("groups.csv"), type=Path) | |
assignementcli.add_argument("--rwth-export", required=True, type=Path) | |
uploadcli = subparser.add_parser( | |
"upload-groups", help="Load groups from CSV and upload them to Moodle" | |
) | |
uploadcli.add_argument("--input", default=Path("groups.csv"), type=Path) | |
args = parser.parse_args() | |
if args.command == "assign-groups": | |
assign_groups_cli(args.rwth_export, args.output) | |
elif args.command == "upload-groups": | |
upload_groups_cli(args.input) | |
if __name__ == "__main__": | |
entrypoint() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Moodle] | |
wwwroot = https://moodle.rwth-aachen.de | |
token = <REDACTED> | |
courseid = 17070 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment