Created
May 11, 2021 01:36
-
-
Save bradenmacdonald/930c7655dca32dc648af9cb0aed4a7c5 to your computer and use it in GitHub Desktop.
Open edX API Client Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
API Client used to make requests from Open edX | |
Originally created by OpenCraft for LabXchange, www.labxchange.org | |
You may use this under the terms of the Apache 2 license, | |
https://www.apache.org/licenses/LICENSE-2.0 | |
""" | |
import logging | |
from types import SimpleNamespace | |
from urllib.parse import urlparse, parse_qs | |
import requests | |
from django.conf import settings | |
from django.core.cache import cache | |
from django.db import transaction | |
from oauthlib.oauth2 import BackendApplicationClient, TokenExpiredError | |
from opaque_keys.edx.keys import UsageKey | |
from opaque_keys.edx.locator import LibraryUsageLocatorV2 | |
from rest_framework import serializers | |
from social_django.models import UserSocialAuth | |
from requests_oauthlib import OAuth2Session | |
log = logging.getLogger(__name__) | |
######################################################################################################################## | |
####### Serializers #################################################################################################### | |
class DataSerializer(serializers.Serializer): | |
""" | |
Serializer for data that does not need to be persisted directly. | |
Inherit from it to avoid needing to disable abstract-method warnings. | |
""" | |
def create(self, validated_data): | |
pass | |
def update(self, instance, validated_data): | |
pass | |
class GenericObjectSerializer(DataSerializer): | |
""" | |
Serializer for any response that returns a JSON dict, without specifying | |
the fields of that dict in detail. | |
""" | |
def to_representation(self, instance): | |
return instance | |
def to_internal_value(self, data): | |
return data | |
class Meta: | |
swagger_schema_fields = { | |
"type": "object", | |
"additionalProperties": True, | |
} | |
class ContentLibraryMetadataSerializer(serializers.Serializer): | |
""" | |
Serializer for metadata about a content library | |
""" | |
id = serializers.CharField() | |
org = serializers.SlugField() | |
slug = serializers.SlugField() | |
bundle_uuid = serializers.UUIDField() | |
title = serializers.CharField() | |
description = serializers.CharField(allow_blank=True) | |
version = serializers.IntegerField() | |
has_unpublished_changes = serializers.BooleanField() | |
has_unpublished_deletes = serializers.BooleanField() | |
class LibraryLinkSerializer(serializers.Serializer): | |
""" | |
Serializer for a link from a content library to another blockstore bundle | |
""" | |
id = serializers.CharField() | |
bundle_uuid = serializers.UUIDField() | |
version = serializers.IntegerField() | |
latest_version = serializers.IntegerField() | |
opaque_key = serializers.CharField(allow_blank=True) | |
class LibraryXBlockMetadataSerializer(serializers.Serializer): | |
""" | |
Serializer for metadata about an XBlock in a content library | |
""" | |
id = serializers.CharField() | |
def_key = serializers.CharField() | |
block_type = serializers.CharField() | |
display_name = serializers.CharField(allow_blank=True) | |
has_unpublished_changes = serializers.BooleanField() | |
class LibraryXBlockAssetFileSerializer(serializers.Serializer): | |
""" | |
Serializer for metadata about a static asset file belonging to an XBlock in | |
a content library | |
""" | |
path = serializers.CharField() | |
url = serializers.CharField() | |
size = serializers.IntegerField() | |
class XBlockMetadataSerializer(serializers.Serializer): | |
""" | |
Serializer for basic metadata about an XBlock | |
""" | |
block_id = serializers.CharField() | |
block_type = serializers.CharField() | |
# Avoid validation failure if studio returns empty display_name for xblock | |
display_name = serializers.CharField(allow_blank=True) | |
index_dictionary = GenericObjectSerializer(required=False) | |
student_view_data = GenericObjectSerializer(required=False) | |
children = serializers.ListField(child=serializers.CharField(), required=False) | |
editable_children = serializers.ListField(child=serializers.CharField(), required=False) | |
def validate(self, attrs): | |
""" | |
Field is allowed to be blank, to avoid olx editor failure. | |
Log error on empty display_name. | |
""" | |
super().validate(attrs) | |
if not attrs.get('display_name', ''): | |
message = 'Missing `display_name` attribute for {block_id} olx.'.format( | |
block_id=attrs.get('block_id') | |
) | |
log.warning(message) | |
return attrs | |
######################################################################################################################## | |
####### Helper functions ############################################################################################### | |
def _deserialize_with(serializer_class, data): | |
""" | |
Use the specified DRF serializer to parse a response | |
""" | |
serializer = serializer_class(data=data) | |
serializer.is_valid(raise_exception=True) | |
return SimpleNamespace(**serializer.validated_data) | |
def _update_session_language_header(session, language): | |
accept_language = f'{language},en;q=0.8' if language and not language.startswith('en') else 'en' | |
session.headers.update({'Accept-Language': accept_language}) | |
######################################################################################################################## | |
####### URLs ########################################################################################################### | |
URL_LIB_PREFIX = '/api/libraries/v2/' | |
URL_LIB_CREATE = URL_LIB_PREFIX | |
URL_LIB_DETAIL = URL_LIB_PREFIX + '{lib_key}/' # Get data about a library, update or delete library | |
URL_LIB_BLOCK_TYPES = URL_LIB_DETAIL + 'block_types/' # Get the list of XBlock types that can be added to this library | |
URL_LIB_LINKS = URL_LIB_DETAIL + 'links/' # Get the list of links defined for this content library | |
URL_LIB_LINK = URL_LIB_DETAIL + 'links/{link_id}/' # Update a specific link | |
URL_LIB_COMMIT = URL_LIB_DETAIL + 'commit/' # Commit (POST) or revert (DELETE) all pending changes to this library | |
URL_LIB_BLOCKS = URL_LIB_DETAIL + 'blocks/' # Get the list of XBlocks in this library, or add a new one | |
URL_LIB_BLOCK = URL_LIB_PREFIX + 'blocks/{block_key}/' # Get data about a block, or delete it | |
URL_LIB_BLOCK_OLX = URL_LIB_BLOCK + 'olx/' # Get or set the OLX of the specified XBlock | |
URL_LIB_BLOCK_ASSETS = URL_LIB_BLOCK + 'assets/' # Get the static asset files belonging to the specified XBlock | |
URL_LIB_BLOCK_ASSET = URL_LIB_BLOCK + 'assets/{filename}' # Get a static asset file belonging to the specified XBlock | |
URL_BLOCK_BASE = '/api/xblock/v2/xblocks/{block_key}/' | |
URL_BLOCK_METADATA = URL_BLOCK_BASE | |
URL_BLOCK_RENDER_VIEW = URL_BLOCK_BASE + 'view/{view_name}/' | |
URL_BLOCK_GET_HANDLER_URL = URL_BLOCK_BASE + 'handler_url/{handler_name}/' | |
BLOCK_GET_HANDLER_URL_CACHE_KEY = '{username}:{url}' | |
URL_PATHWAYS_PREFIX = '/api/lx-pathways/v1/pathway/' | |
URL_PATHWAYS_DETAIL = URL_PATHWAYS_PREFIX + '{pathway_key}/' | |
URL_PATHWAYS_PUBLISH = URL_PATHWAYS_PREFIX + '{pathway_key}/publish/' | |
URL_MODULESTORE_BLOCK_OLX = '/api/olx-export/v1/xblock/{block_key}/' | |
URL_COURSES_BASE = '/api/courses/v1/' | |
URL_COURSES_LIST = URL_COURSES_BASE + 'courses/' | |
URL_COURSES_BLOCKS = URL_COURSES_BASE + 'blocks/' | |
COURSE_LIST_CACHE_KEY = 'course-list:{username}:{org}' | |
URL_ENROLLMENT_BASE = '/api/enrollment/v1/' | |
URL_ENROLLMENT_ROLES = URL_ENROLLMENT_BASE + 'roles/' | |
######################################################################################################################## | |
####### EdxAppClient ################################################################################################### | |
class EdxAppClient: | |
""" | |
API client for Open edX LMS/Studio (a.k.a. edxapp / edx-platform) | |
Use this to connect to Open edX as a service user (as your backend application); use the | |
EdxAppUserClient subclass to connect as a particular user. | |
""" | |
def __init__(self, lms_url, studio_url, oauth_key=None, oauth_secret=None): | |
""" | |
Initialize this Open edX API client | |
""" | |
# The LMS URL, e.g. "http://edx.devstack.lms:18000" | |
self.lms_url = lms_url | |
# The Studio URL, e.g. "http://edx.devstack.studio:18010" | |
self.studio_url = studio_url | |
# The OAuth2 client key used to connect to Studio. Not required if | |
# connecting as a specific user (see EdxAppUserClient subclass) | |
self.client_oauth_key = oauth_key # e.g. settings.LMS_API_AUTH_KEY | |
self.client_oauth_secret = oauth_secret # e.g. settings.LMS_API_AUTH_SECRET | |
# The path to the LMS's OAuth2 access token endpoint, e.g. "http://edx.devstack.lms:18000/oauth2/access_token" | |
self.token_url = lms_url + '/oauth2/access_token' | |
self.session, refresh_now = self.init_session() | |
if refresh_now: | |
self.refresh_session_token() | |
def init_session(self): | |
""" | |
Initialize the HTTP session that this client will use. | |
Returns the session and a boolean indicating if refresh_session_token | |
should immediately be called, as opposed to calling it only after trying | |
an API call and getting a 401 response. | |
""" | |
client = BackendApplicationClient(client_id=self.client_oauth_key) | |
return OAuth2Session(client=client), True | |
def refresh_session_token(self): | |
""" | |
Refreshes the authenticated session with a new token. | |
""" | |
# We cannot use session.fetch_token() because it sends the client | |
# credentials using HTTP basic auth instead of as POST form data. | |
res = requests.post(self.token_url, data={ | |
'client_id': self.client_oauth_key, | |
'client_secret': self.client_oauth_secret, | |
'grant_type': 'client_credentials' | |
}) | |
res.raise_for_status() | |
data = res.json() | |
self.session.token = {'access_token': data['access_token']} | |
def call_lms_raw(self, method, path, **kwargs): | |
""" | |
Make an LMS API call and return the HTTP response. | |
""" | |
url = self.lms_url + path | |
endpoint = kwargs.pop('edx_endpoint', '') | |
try: | |
response = self.session.request(method, url, **kwargs) | |
if response.status_code == 401: | |
raise TokenExpiredError | |
except TokenExpiredError: | |
self.refresh_session_token() | |
response = self.session.request(method, url, **kwargs) | |
return response | |
def call_studio_raw(self, method, path, **kwargs): | |
""" | |
Make a Studio API call and return the HTTP response. | |
""" | |
url = self.studio_url + path | |
endpoint = kwargs.pop('edx_endpoint', '') | |
try: | |
response = self.session.request(method, url, **kwargs) | |
if response.status_code == 401: | |
raise TokenExpiredError | |
except TokenExpiredError: | |
self.refresh_session_token() | |
response = self.session.request(method, url, **kwargs) | |
return response | |
def call_lms(self, method, path, **kwargs): | |
""" | |
Make an API call from the LMS. Returns the parsed JSON response. | |
""" | |
response = self.call_lms_raw(method, path, **kwargs) | |
if response.status_code == 400: | |
log.error("400 Bad Request from LMS: %s", response.content) | |
response.raise_for_status() | |
if response.status_code == 204: # No content | |
return None | |
return response.json() | |
def call_studio(self, method, path, **kwargs): | |
""" | |
Make an API call from Studio. Returns the parsed JSON response. | |
""" | |
response = self.call_studio_raw(method, path, **kwargs) | |
if response.status_code == 400: | |
log.error("400 Bad Request from Studio: %s", response.content) | |
response.raise_for_status() | |
if response.status_code == 204: # No content | |
return None | |
return response.json() | |
# Authentication and User Accounts ######################################### | |
def get_username(self, required=True) -> str: | |
""" | |
Get the LMS/Studio username being used to make API requests. | |
If this API client cannot make authenticated calls to the Open edX API, | |
this will raise an exception, unless required is False, in which case it | |
will just return None. | |
""" | |
response = self.call_lms_raw('get', '/api/user/v1/me') | |
if required: | |
response.raise_for_status() | |
else: | |
if response.status_code == 401: | |
return None | |
return response.json()["username"] | |
def get_user_details(self, username): | |
""" | |
Get a user's profile. | |
""" | |
return self.call_lms('get', f'/api/user/v1/accounts/{username}') | |
# Content Libraries ######################################################## | |
def get_library(self, lib_key): | |
""" Get a content library """ | |
data = self.call_studio('get', URL_LIB_DETAIL.format(lib_key=lib_key), edx_endpoint=URL_LIB_DETAIL) | |
return _deserialize_with(serializers.ContentLibraryMetadataSerializer, data) | |
def library_exists(self, lib_key): | |
""" Does the specified content library exist? """ | |
response = self.call_studio_raw('get', URL_LIB_DETAIL.format(lib_key=lib_key)) | |
return response.status_code == 200 | |
def create_library(self, org_id, collection_uuid, slug, title, description=""): | |
""" Create a content library """ | |
data = self.call_studio('post', URL_LIB_CREATE, json={ | |
"org": org_id, | |
"collection_uuid": str(collection_uuid), | |
"slug": slug, | |
"title": title, | |
"description": description, | |
"allow_public_learning": True, | |
"allow_public_read": False, | |
}, edx_endpoint=URL_LIB_CREATE) | |
return _deserialize_with(serializers.ContentLibraryMetadataSerializer, data) | |
def delete_library(self, lib_key): | |
""" Delete an existing content library """ | |
self.call_studio('delete', URL_LIB_DETAIL.format(lib_key=lib_key), edx_endpoint=URL_LIB_DETAIL) | |
def get_library_links(self, lib_key): | |
""" Get the links in this content library """ | |
links = self.call_studio('get', URL_LIB_LINKS.format(lib_key=lib_key), edx_endpoint=URL_LIB_LINKS) | |
return [_deserialize_with(serializers.LibraryLinkSerializer, link) for link in links] | |
def get_library_link(self, lib_key, link_id): | |
""" Get a single link for the specified content library, or None """ | |
links = self.get_library_links(lib_key) | |
for link in links: | |
if link.id == link_id: | |
return link | |
return None | |
def create_library_link(self, lib_key, link_id: str, target_lib, version=None): | |
""" | |
Modify the library 'libraryId' to include a new link to the specified | |
library. Will fail if a link with the same name already exists. | |
Can optionally link to a specific version of the target library. | |
""" | |
return self.call_studio('post', URL_LIB_LINKS.format(lib_key=lib_key), json={ | |
"id": link_id, | |
"opaque_key": str(target_lib), | |
"version": version, | |
}, edx_endpoint=URL_LIB_LINKS) | |
def update_library_link(self, lib_key, link_id, version=None): | |
""" | |
Change the version of an existing library link. Set version=None to use | |
the latest version. | |
""" | |
self.call_studio('patch', URL_LIB_LINK.format(lib_key=lib_key, link_id=link_id), json={ | |
"version": version, | |
}, edx_endpoint=URL_LIB_LINK) | |
def delete_library_link(self, lib_key, link_id): | |
""" | |
Delete a link from the specified library. | |
""" | |
self.call_studio('delete', URL_LIB_LINK.format(lib_key=lib_key, link_id=link_id), edx_endpoint=URL_LIB_LINK) | |
def get_library_block_olx(self, block_key): | |
""" Get the OLX of a specific block in a library """ | |
data = self.call_studio('get', URL_LIB_BLOCK_OLX.format(block_key=block_key), edx_endpoint=URL_LIB_BLOCK_OLX) | |
return data["olx"] | |
def set_library_block_olx(self, block_key, new_olx): | |
""" Overwrite the OLX of a specific block in the library """ | |
return self.call_studio( | |
'post', | |
URL_LIB_BLOCK_OLX.format(block_key=block_key), | |
json={"olx": new_olx}, | |
edx_endpoint=URL_LIB_BLOCK_OLX, | |
) | |
def commit_library_changes(self, lib_key): | |
""" Commit changes to an existing library """ | |
return self.call_studio('post', URL_LIB_COMMIT.format(lib_key=lib_key), edx_endpoint=URL_LIB_COMMIT) | |
def revert_library_changes(self, lib_key): | |
""" Revert pending changes to an existing library """ | |
self.call_studio('delete', URL_LIB_COMMIT.format(lib_key=lib_key), edx_endpoint=URL_LIB_COMMIT) | |
def add_block_to_library(self, lib_key, block_type, slug, parent_block=None): | |
""" Add a new XBlock to the library """ | |
block_info = {"block_type": block_type, "definition_id": slug} | |
if parent_block: | |
block_info["parent_block"] = str(parent_block) | |
data = self.call_studio( | |
'post', | |
URL_LIB_BLOCKS.format(lib_key=lib_key), | |
json=block_info, | |
edx_endpoint=URL_LIB_BLOCKS, | |
) | |
return _deserialize_with(serializers.LibraryXBlockMetadataSerializer, data) | |
def get_library_block(self, block_key): | |
""" Get a specific block in the library """ | |
data = self.call_studio('get', URL_LIB_BLOCK.format(block_key=block_key), edx_endpoint=URL_LIB_BLOCK) | |
return _deserialize_with(serializers.LibraryXBlockMetadataSerializer, data) | |
def get_library_block_or_none(self, block_key): | |
""" Get a specific block in the library, or None if it does not exist """ | |
try: | |
return self.get_library_block(block_key) | |
except requests.HTTPError as err: | |
if err.response.status_code == 404: | |
return None | |
raise | |
def delete_library_block(self, block_key): | |
""" Delete a specific block from the library """ | |
self.call_studio('delete', URL_LIB_BLOCK.format(block_key=block_key), edx_endpoint=URL_LIB_BLOCK) | |
def get_library_blocks(self, lib_key): | |
""" Get the list of XBlocks in the library """ | |
data = self.call_studio('get', URL_LIB_BLOCKS.format(lib_key=lib_key), edx_endpoint=URL_LIB_BLOCKS) | |
return [_deserialize_with(serializers.LibraryXBlockMetadataSerializer, block) for block in data] | |
def get_editable_child_blocks_of_library_block(self, block_key: LibraryUsageLocatorV2): | |
""" | |
Get a list of block IDs of child XBlocks that are in the same library. | |
This excludes linked XBlocks from other bundles, because those are not | |
editable. | |
""" | |
metadata = self.get_xblock_metadata(block_key, extra_fields=['editable_children']) | |
return [UsageKey.from_string(k) for k in metadata.editable_children] | |
def get_library_block_assets(self, block_key): | |
""" | |
Get a list of the static asset files belonging to the specified XBlock. | |
""" | |
url = URL_LIB_BLOCK_ASSETS.format(block_key=block_key) | |
files = self.call_studio('get', url, edx_endpoint=URL_LIB_BLOCK_ASSETS)["files"] | |
return [_deserialize_with(serializers.LibraryXBlockAssetFileSerializer, f) for f in files] | |
def get_library_block_asset_file(self, block_key, filename, else_none=False): | |
""" | |
Get metadata about a static asset file belonging to the specified XBlock. | |
""" | |
url = URL_LIB_BLOCK_ASSET.format(block_key=block_key, filename=filename) | |
response = self.call_studio_raw('get', url) | |
if response.status_code == 404 and else_none: | |
return None | |
response.raise_for_status() | |
return _deserialize_with(serializers.LibraryXBlockAssetFileSerializer, response.json()) | |
def set_library_block_asset_file(self, block_key, filename, content): | |
""" | |
Upload/replace the static asset file with the given name belonging to | |
the specified XBlock. | |
""" | |
url = URL_LIB_BLOCK_ASSET.format(block_key=block_key, filename=filename) | |
data = self.call_studio('put', url, files={"content": content}, edx_endpoint=URL_LIB_BLOCK_ASSET) | |
return _deserialize_with(serializers.LibraryXBlockAssetFileSerializer, data) | |
# XBlock API ############################################################### | |
def render_block_view(self, block_key, view_name): | |
""" | |
Render an XBlock's view via the LMS. | |
""" | |
url = URL_BLOCK_RENDER_VIEW.format(block_key=block_key, view_name=view_name) | |
return self.call_lms('get', url, edx_endpoint=URL_BLOCK_RENDER_VIEW) | |
def render_block_view_studio(self, block_key, view_name): | |
""" | |
Render an XBlock's view via Studio, which shows the draft version of the | |
block. | |
""" | |
url = URL_BLOCK_RENDER_VIEW.format(block_key=block_key, view_name=view_name) | |
return self.call_studio('get', url, edx_endpoint=URL_BLOCK_RENDER_VIEW) | |
def get_xblock_metadata(self, block_key, extra_fields=None): | |
""" | |
Get the metadata for any XBlock that uses the new Blockstore-based | |
XBlock runtime. | |
""" | |
url = URL_BLOCK_METADATA.format(block_key=block_key) | |
data = self.call_lms( | |
'get', | |
url, | |
params={"include": ','.join(extra_fields or [])}, | |
edx_endpoint=URL_BLOCK_METADATA, | |
) | |
# This is a hack to continue working with the upstream api before | |
# https://github.com/edx/edx-platform/pull/23246 is deployed. | |
if "index_dictionary" not in data: | |
data["index_dictionary"] = {} | |
return _deserialize_with(serializers.XBlockMetadataSerializer, data) | |
def get_xblock_children(self, block_key: UsageKey): | |
""" | |
Get a list of block IDs of child XBlocks. | |
""" | |
metadata = self.get_xblock_metadata(block_key, extra_fields=['children']) | |
return [UsageKey.from_string(k) for k in metadata.children] | |
def get_block_handler_url(self, block_key, handler_name, use_cached_handler_url=False): | |
""" | |
Get the URL to call a specific XBlock's handler via the LMS. | |
""" | |
url = URL_BLOCK_GET_HANDLER_URL.format(block_key=block_key, handler_name=handler_name) | |
if not hasattr(self, 'user'): | |
# Normal version - just query the LMS and return the result. | |
# This is the only version that will work for anonymous users. | |
return self.call_lms('get', url, edx_endpoint=URL_BLOCK_GET_HANDLER_URL)["handler_url"] | |
# Version with caching: | |
cache_key = BLOCK_GET_HANDLER_URL_CACHE_KEY.format(username=self.user.username, url=url) # pylint: disable=no-member | |
handler_url = cache.get(cache_key) | |
if use_cached_handler_url and handler_url: | |
return handler_url | |
handler_url = self.call_lms('get', url, edx_endpoint=URL_BLOCK_GET_HANDLER_URL)["handler_url"] | |
cache.set(cache_key, handler_url, 60 * 60) | |
return handler_url | |
def get_block_handler_url_studio(self, block_key, handler_name): | |
""" | |
Get the URL to call a specific XBlock's handler via Studio. | |
""" | |
url = URL_BLOCK_GET_HANDLER_URL.format(block_key=block_key, handler_name=handler_name) | |
return self.call_studio('get', url, edx_endpoint=URL_BLOCK_GET_HANDLER_URL)["handler_url"] | |
def call_block_handler( | |
self, block_key, handler_name, method='get', json_response=True, | |
suffix=None, use_cached_handler_url=False, **kwargs | |
): | |
""" | |
Call an XBlock handler (in the LMS) and get the data it returns. | |
You can pass in any parameters that 'requests' accepts. | |
""" | |
handler_url = self.get_block_handler_url(block_key, handler_name, use_cached_handler_url) | |
# Hack: On devstacks, the LMS will return handler_urls that the user can | |
# access via their browser, but we need a URL that works over the docker | |
# network. This doesn't affect prod. (Is there a better way to deal with this?) | |
handler_url = handler_url.replace(settings.LMS_ROOT_PUBLIC, settings.LMS_ROOT) | |
if suffix: | |
handler_url += suffix | |
response = self.session.request(method, handler_url, **kwargs) | |
if json_response: | |
return response.json() | |
return response | |
def call_block_handler_studio( | |
self, block_key, handler_name, method='get', json_response=True, suffix=None, **kwargs, | |
): | |
""" | |
Call an XBlock handler (in Studio/draft mode) and get the data it returns. | |
You can pass in any parameters that 'requests' accepts. | |
""" | |
# we can still get the url information from the LMS | |
handler_url = self.get_block_handler_url(block_key, handler_name) | |
handler_url = handler_url.replace(settings.LMS_ROOT_PUBLIC, settings.CMS_ROOT) | |
if suffix: | |
handler_url += '/' + suffix | |
response = self.session.request(method, handler_url, **kwargs) | |
if json_response: | |
return response.json() | |
return response | |
# Miscellaneous ############################################################ | |
def get_modulestore_block_olx_data(self, block_key): | |
""" | |
Use the Open edX OLX REST API plugin to get the OLX of the specified | |
XBlock. | |
See https://github.com/open-craft/openedx-olx-rest-api | |
""" | |
url = URL_MODULESTORE_BLOCK_OLX.format(block_key=block_key) | |
return self.call_studio('get', url, edx_endpoint=URL_MODULESTORE_BLOCK_OLX) | |
def get_courses(self, org=None): | |
""" | |
Get the full list of courses visible to the requesting user. Anonymous usage is not supported. | |
This can be an expensive call, as it may return hundreds of courses, so cache it. | |
""" | |
cache_key = COURSE_LIST_CACHE_KEY.format(username=self.user.username, org=org if org else "all") # pylint: disable=no-member | |
course_list = cache.get(cache_key) | |
if course_list: | |
return course_list | |
def _get_courses(url, params=None): | |
""" Recursively load all pages of course data. """ | |
data = self.call_lms("get", url, params=params, edx_endpoint=URL_COURSES_LIST) | |
results = data.get("results", []) | |
next_page_url = data.get("pagination", {}).get("next") | |
if next_page_url: | |
next_page = urlparse(next_page_url) | |
results += _get_courses(next_page.path, parse_qs(next_page.query)) | |
return results | |
# Fetch using the API's maximum page size to minimize the risk of throttling. | |
params = {"page_size": 100} | |
if org: | |
params["org"] = org | |
course_list = _get_courses(URL_COURSES_LIST, params) | |
cache.set(cache_key, course_list, 60 * 60) | |
return course_list | |
def get_course_blocks(self, course_id, block_types=None, requested_fields=None): | |
""" | |
Get the list of course access roles for the requesting user. | |
""" | |
params = { | |
"course_id": course_id, | |
"all_blocks": True, | |
"depth": "all", | |
"return_type": "dict", | |
} | |
if block_types: | |
params["block_types_filter"] = block_types | |
if requested_fields: | |
params["requested_fields"] = requested_fields | |
response = self.call_lms("get", URL_COURSES_BLOCKS, params=params, edx_endpoint=URL_COURSES_BLOCKS) | |
return response.get("blocks") | |
def get_course_access_roles(self, course_id=None): | |
""" | |
Get the list of course access roles for the requesting user. | |
""" | |
params = None | |
if course_id: | |
params = {"course_id": course_id} | |
return self.call_lms("get", URL_ENROLLMENT_ROLES, params=params, edx_endpoint=URL_ENROLLMENT_ROLES) | |
class EdxAppUserClient(EdxAppClient): | |
""" | |
API client for making API requests using a user's account. | |
This only works for registered users who logged in using SSO via | |
https://github.com/edx/auth-backends | |
""" | |
def __init__(self, user): | |
""" | |
Initialize this API client | |
""" | |
self.user = user | |
super().__init__( | |
lms_url=settings.LMS_ROOT, | |
studio_url=settings.CMS_ROOT, | |
oauth_key=None, | |
oauth_secret=None, | |
) # Don't pass any params to the parent | |
def init_session(self): | |
""" | |
Initialize the HTTP session that this client will use. | |
""" | |
if self.user.is_anonymous: | |
raise Exception("EdxAppUserClient does not support anonymous users. Use EdxAppAnonymousUserClient.") | |
try: | |
user_social_auth = self.user.social_auth.get(provider='edx-oauth2') | |
except UserSocialAuth.DoesNotExist: | |
raise Exception("The user did not log in with SSO, so we can't call the Open edX API on their behalf") | |
user_access_token = user_social_auth.extra_data['access_token'] | |
session = requests.Session() | |
# When the user first logs in via SSO, they will get a JWT access token. | |
# But when we use the refresh_token to refresh it, we'll get a normal OAuth2 | |
# access token back. | |
if '.' in user_access_token: | |
session.headers.update({"Authorization": f"JWT {user_access_token}"}) | |
else: | |
session.headers.update({"Authorization": f"Bearer {user_access_token}"}) | |
# We want the LMS to take language from request and return translated content accordingly | |
language = self.user.profile.preferred_language | |
_update_session_language_header(session, language) | |
return session, False | |
def refresh_session_token(self): | |
""" | |
Refreshes the authenticated session with a new token. | |
""" | |
expired_access_token = self.user.social_auth.get(provider='edx-oauth2').extra_data.get('access_token') | |
with transaction.atomic(using='default'): # Need to specify 'default' or the transaction doesn't work | |
# Now, acquire a write lock on the user's social_auth entry. | |
# This will pause at this point if another thread is also refreshing the token, which is quite likely | |
# due to our application's use of parallel API requests. | |
user_social_auth = self.user.social_auth.using('default').select_for_update().get(provider='edx-oauth2') | |
user_access_token = user_social_auth.extra_data.get('access_token') | |
if user_access_token != expired_access_token: | |
log.debug("Another backend process already refreshed the token while we waited for the row to unlock") | |
else: | |
log.info(f"Updating access_token and refresh_token for {self.user.username}") | |
response = requests.post(self.token_url, data={ | |
'client_id': settings.SOCIAL_AUTH_EDX_OAUTH2_KEY, | |
'client_secret': settings.SOCIAL_AUTH_EDX_OAUTH2_SECRET, | |
'grant_type': 'refresh_token', | |
'refresh_token': user_social_auth.extra_data['refresh_token'], | |
}) | |
response.raise_for_status() | |
new_token_data = response.json() | |
user_access_token = new_token_data['access_token'] | |
user_social_auth.extra_data['refresh_token'] = new_token_data['refresh_token'] | |
user_social_auth.extra_data['access_token'] = user_access_token | |
user_social_auth.save(using='default', update_fields=['extra_data']) | |
self.session.headers.update({"Authorization": f"Bearer {user_access_token}"}) | |
class EdxAppAnonymousUserClient(EdxAppClient): | |
""" | |
API client for connecting to Open edX using a user who hasn't registered an | |
account. | |
Unlike the other EdXAppClient classes, this one authenticates using session | |
cookies, not OAuth (because anonymous users can't use OAuth but do have | |
sessions) | |
""" | |
def __init__(self, request): | |
""" | |
Initialize this Studio API client | |
""" | |
if not request.user.is_anonymous: | |
raise ValueError("EdxAppAnonymousUserClient only works for anonymous (non-registered) users.") | |
self.request = request | |
super().__init__(lms_url=settings.LMS_ROOT, studio_url=settings.CMS_ROOT, oauth_key=None, oauth_secret=None) | |
def init_session(self): | |
""" | |
Initialize the HTTP session that this client will use. | |
Note there are two sessions here: | |
(1) a requests.Session HTTP session between this backend and edxapp | |
(2) the current anonymous user's django session, in which we store the | |
session cookie required to authenticate the (1) requests.Session. | |
""" | |
session = requests.Session() | |
# Has this user already made an LMS/Studio request? | |
if 'edxapp_session_cookies' in self.request.session: | |
# The cookies (which we've stored in this user's django session) should be valid - use them as-is | |
session.cookies.update(self.request.session['edxapp_session_cookies']) | |
return session, False | |
def refresh_session_token(self): | |
""" | |
Refreshes the authenticated session credentials | |
""" | |
# call_lms_raw / call_studio_raw will update the session cookies on every request; no need to do that here | |
def call_lms_raw(self, method, path, **kwargs): | |
""" | |
Make an LMS API call and return the HTTP response. | |
""" | |
response = super().call_lms_raw(method, path, **kwargs) | |
# Sometimes the session cookie will change, so we need to save the latest version so it doesn't expire: | |
# We can't just do 'self.request.session['edxapp_session_cookies'] = dict(self.session.cookies)' | |
# because the LMS sends back a 'csrftoken' cookie with two different domains, leading to a CookieConflictError | |
cookies_to_save = {key: value for (key, value) in self.session.cookies.items() if 'session' in key} | |
self.request.session['edxapp_session_cookies'] = cookies_to_save | |
return response | |
def call_studio_raw(self, method, path, **kwargs): | |
""" | |
Make a Studio API call and return the HTTP response. | |
""" | |
response = super().call_studio_raw(method, path, **kwargs) | |
cookies_to_save = {key: value for (key, value) in self.session.cookies.items() if 'session' in key} | |
self.request.session['edxapp_session_cookies'] = cookies_to_save | |
return response | |
def default_openedx_client() -> EdxAppClient: | |
""" | |
Get the default client, which connects to Open edX as the service user. | |
This method has a side effect - it will make an HTTP request to the LMS the | |
first time it is called. | |
""" | |
if not hasattr(default_openedx_client, "client"): | |
default_openedx_client.client = EdxAppClient( | |
lms_url=settings.LMS_ROOT, | |
studio_url=settings.CMS_ROOT, | |
oauth_key=settings.LMS_API_AUTH_KEY, | |
oauth_secret=settings.LMS_API_AUTH_SECRET, | |
) | |
return default_openedx_client.client | |
def openedx_client_for_request(request) -> EdxAppClient: | |
""" | |
Given a django HTTP request, return an Open edX client instance for the | |
request's user, whether they are a registered or anonymous user. | |
""" | |
if request.user.is_anonymous: | |
return EdxAppAnonymousUserClient(request) | |
else: | |
return EdxAppUserClient(request.user) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment