Skip to content

Instantly share code, notes, and snippets.

@bradenmacdonald
Created May 11, 2021 01:36
Show Gist options
  • Save bradenmacdonald/930c7655dca32dc648af9cb0aed4a7c5 to your computer and use it in GitHub Desktop.
Save bradenmacdonald/930c7655dca32dc648af9cb0aed4a7c5 to your computer and use it in GitHub Desktop.
Open edX API Client Example
"""
API Client used to make requests from Open edX
Originally created by OpenCraft for LabXchange, www.labxchange.org
You may use this under the terms of the Apache 2 license,
https://www.apache.org/licenses/LICENSE-2.0
"""
import logging
from types import SimpleNamespace
from urllib.parse import urlparse, parse_qs
import requests
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from oauthlib.oauth2 import BackendApplicationClient, TokenExpiredError
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.locator import LibraryUsageLocatorV2
from rest_framework import serializers
from social_django.models import UserSocialAuth
from requests_oauthlib import OAuth2Session
log = logging.getLogger(__name__)
########################################################################################################################
####### Serializers ####################################################################################################
class DataSerializer(serializers.Serializer):
"""
Serializer for data that does not need to be persisted directly.
Inherit from it to avoid needing to disable abstract-method warnings.
"""
def create(self, validated_data):
pass
def update(self, instance, validated_data):
pass
class GenericObjectSerializer(DataSerializer):
"""
Serializer for any response that returns a JSON dict, without specifying
the fields of that dict in detail.
"""
def to_representation(self, instance):
return instance
def to_internal_value(self, data):
return data
class Meta:
swagger_schema_fields = {
"type": "object",
"additionalProperties": True,
}
class ContentLibraryMetadataSerializer(serializers.Serializer):
"""
Serializer for metadata about a content library
"""
id = serializers.CharField()
org = serializers.SlugField()
slug = serializers.SlugField()
bundle_uuid = serializers.UUIDField()
title = serializers.CharField()
description = serializers.CharField(allow_blank=True)
version = serializers.IntegerField()
has_unpublished_changes = serializers.BooleanField()
has_unpublished_deletes = serializers.BooleanField()
class LibraryLinkSerializer(serializers.Serializer):
"""
Serializer for a link from a content library to another blockstore bundle
"""
id = serializers.CharField()
bundle_uuid = serializers.UUIDField()
version = serializers.IntegerField()
latest_version = serializers.IntegerField()
opaque_key = serializers.CharField(allow_blank=True)
class LibraryXBlockMetadataSerializer(serializers.Serializer):
"""
Serializer for metadata about an XBlock in a content library
"""
id = serializers.CharField()
def_key = serializers.CharField()
block_type = serializers.CharField()
display_name = serializers.CharField(allow_blank=True)
has_unpublished_changes = serializers.BooleanField()
class LibraryXBlockAssetFileSerializer(serializers.Serializer):
"""
Serializer for metadata about a static asset file belonging to an XBlock in
a content library
"""
path = serializers.CharField()
url = serializers.CharField()
size = serializers.IntegerField()
class XBlockMetadataSerializer(serializers.Serializer):
"""
Serializer for basic metadata about an XBlock
"""
block_id = serializers.CharField()
block_type = serializers.CharField()
# Avoid validation failure if studio returns empty display_name for xblock
display_name = serializers.CharField(allow_blank=True)
index_dictionary = GenericObjectSerializer(required=False)
student_view_data = GenericObjectSerializer(required=False)
children = serializers.ListField(child=serializers.CharField(), required=False)
editable_children = serializers.ListField(child=serializers.CharField(), required=False)
def validate(self, attrs):
"""
Field is allowed to be blank, to avoid olx editor failure.
Log error on empty display_name.
"""
super().validate(attrs)
if not attrs.get('display_name', ''):
message = 'Missing `display_name` attribute for {block_id} olx.'.format(
block_id=attrs.get('block_id')
)
log.warning(message)
return attrs
########################################################################################################################
####### Helper functions ###############################################################################################
def _deserialize_with(serializer_class, data):
"""
Use the specified DRF serializer to parse a response
"""
serializer = serializer_class(data=data)
serializer.is_valid(raise_exception=True)
return SimpleNamespace(**serializer.validated_data)
def _update_session_language_header(session, language):
accept_language = f'{language},en;q=0.8' if language and not language.startswith('en') else 'en'
session.headers.update({'Accept-Language': accept_language})
########################################################################################################################
####### URLs ###########################################################################################################
URL_LIB_PREFIX = '/api/libraries/v2/'
URL_LIB_CREATE = URL_LIB_PREFIX
URL_LIB_DETAIL = URL_LIB_PREFIX + '{lib_key}/' # Get data about a library, update or delete library
URL_LIB_BLOCK_TYPES = URL_LIB_DETAIL + 'block_types/' # Get the list of XBlock types that can be added to this library
URL_LIB_LINKS = URL_LIB_DETAIL + 'links/' # Get the list of links defined for this content library
URL_LIB_LINK = URL_LIB_DETAIL + 'links/{link_id}/' # Update a specific link
URL_LIB_COMMIT = URL_LIB_DETAIL + 'commit/' # Commit (POST) or revert (DELETE) all pending changes to this library
URL_LIB_BLOCKS = URL_LIB_DETAIL + 'blocks/' # Get the list of XBlocks in this library, or add a new one
URL_LIB_BLOCK = URL_LIB_PREFIX + 'blocks/{block_key}/' # Get data about a block, or delete it
URL_LIB_BLOCK_OLX = URL_LIB_BLOCK + 'olx/' # Get or set the OLX of the specified XBlock
URL_LIB_BLOCK_ASSETS = URL_LIB_BLOCK + 'assets/' # Get the static asset files belonging to the specified XBlock
URL_LIB_BLOCK_ASSET = URL_LIB_BLOCK + 'assets/{filename}' # Get a static asset file belonging to the specified XBlock
URL_BLOCK_BASE = '/api/xblock/v2/xblocks/{block_key}/'
URL_BLOCK_METADATA = URL_BLOCK_BASE
URL_BLOCK_RENDER_VIEW = URL_BLOCK_BASE + 'view/{view_name}/'
URL_BLOCK_GET_HANDLER_URL = URL_BLOCK_BASE + 'handler_url/{handler_name}/'
BLOCK_GET_HANDLER_URL_CACHE_KEY = '{username}:{url}'
URL_PATHWAYS_PREFIX = '/api/lx-pathways/v1/pathway/'
URL_PATHWAYS_DETAIL = URL_PATHWAYS_PREFIX + '{pathway_key}/'
URL_PATHWAYS_PUBLISH = URL_PATHWAYS_PREFIX + '{pathway_key}/publish/'
URL_MODULESTORE_BLOCK_OLX = '/api/olx-export/v1/xblock/{block_key}/'
URL_COURSES_BASE = '/api/courses/v1/'
URL_COURSES_LIST = URL_COURSES_BASE + 'courses/'
URL_COURSES_BLOCKS = URL_COURSES_BASE + 'blocks/'
COURSE_LIST_CACHE_KEY = 'course-list:{username}:{org}'
URL_ENROLLMENT_BASE = '/api/enrollment/v1/'
URL_ENROLLMENT_ROLES = URL_ENROLLMENT_BASE + 'roles/'
########################################################################################################################
####### EdxAppClient ###################################################################################################
class EdxAppClient:
"""
API client for Open edX LMS/Studio (a.k.a. edxapp / edx-platform)
Use this to connect to Open edX as a service user (as your backend application); use the
EdxAppUserClient subclass to connect as a particular user.
"""
def __init__(self, lms_url, studio_url, oauth_key=None, oauth_secret=None):
"""
Initialize this Open edX API client
"""
# The LMS URL, e.g. "http://edx.devstack.lms:18000"
self.lms_url = lms_url
# The Studio URL, e.g. "http://edx.devstack.studio:18010"
self.studio_url = studio_url
# The OAuth2 client key used to connect to Studio. Not required if
# connecting as a specific user (see EdxAppUserClient subclass)
self.client_oauth_key = oauth_key # e.g. settings.LMS_API_AUTH_KEY
self.client_oauth_secret = oauth_secret # e.g. settings.LMS_API_AUTH_SECRET
# The path to the LMS's OAuth2 access token endpoint, e.g. "http://edx.devstack.lms:18000/oauth2/access_token"
self.token_url = lms_url + '/oauth2/access_token'
self.session, refresh_now = self.init_session()
if refresh_now:
self.refresh_session_token()
def init_session(self):
"""
Initialize the HTTP session that this client will use.
Returns the session and a boolean indicating if refresh_session_token
should immediately be called, as opposed to calling it only after trying
an API call and getting a 401 response.
"""
client = BackendApplicationClient(client_id=self.client_oauth_key)
return OAuth2Session(client=client), True
def refresh_session_token(self):
"""
Refreshes the authenticated session with a new token.
"""
# We cannot use session.fetch_token() because it sends the client
# credentials using HTTP basic auth instead of as POST form data.
res = requests.post(self.token_url, data={
'client_id': self.client_oauth_key,
'client_secret': self.client_oauth_secret,
'grant_type': 'client_credentials'
})
res.raise_for_status()
data = res.json()
self.session.token = {'access_token': data['access_token']}
def call_lms_raw(self, method, path, **kwargs):
"""
Make an LMS API call and return the HTTP response.
"""
url = self.lms_url + path
endpoint = kwargs.pop('edx_endpoint', '')
try:
response = self.session.request(method, url, **kwargs)
if response.status_code == 401:
raise TokenExpiredError
except TokenExpiredError:
self.refresh_session_token()
response = self.session.request(method, url, **kwargs)
return response
def call_studio_raw(self, method, path, **kwargs):
"""
Make a Studio API call and return the HTTP response.
"""
url = self.studio_url + path
endpoint = kwargs.pop('edx_endpoint', '')
try:
response = self.session.request(method, url, **kwargs)
if response.status_code == 401:
raise TokenExpiredError
except TokenExpiredError:
self.refresh_session_token()
response = self.session.request(method, url, **kwargs)
return response
def call_lms(self, method, path, **kwargs):
"""
Make an API call from the LMS. Returns the parsed JSON response.
"""
response = self.call_lms_raw(method, path, **kwargs)
if response.status_code == 400:
log.error("400 Bad Request from LMS: %s", response.content)
response.raise_for_status()
if response.status_code == 204: # No content
return None
return response.json()
def call_studio(self, method, path, **kwargs):
"""
Make an API call from Studio. Returns the parsed JSON response.
"""
response = self.call_studio_raw(method, path, **kwargs)
if response.status_code == 400:
log.error("400 Bad Request from Studio: %s", response.content)
response.raise_for_status()
if response.status_code == 204: # No content
return None
return response.json()
# Authentication and User Accounts #########################################
def get_username(self, required=True) -> str:
"""
Get the LMS/Studio username being used to make API requests.
If this API client cannot make authenticated calls to the Open edX API,
this will raise an exception, unless required is False, in which case it
will just return None.
"""
response = self.call_lms_raw('get', '/api/user/v1/me')
if required:
response.raise_for_status()
else:
if response.status_code == 401:
return None
return response.json()["username"]
def get_user_details(self, username):
"""
Get a user's profile.
"""
return self.call_lms('get', f'/api/user/v1/accounts/{username}')
# Content Libraries ########################################################
def get_library(self, lib_key):
""" Get a content library """
data = self.call_studio('get', URL_LIB_DETAIL.format(lib_key=lib_key), edx_endpoint=URL_LIB_DETAIL)
return _deserialize_with(serializers.ContentLibraryMetadataSerializer, data)
def library_exists(self, lib_key):
""" Does the specified content library exist? """
response = self.call_studio_raw('get', URL_LIB_DETAIL.format(lib_key=lib_key))
return response.status_code == 200
def create_library(self, org_id, collection_uuid, slug, title, description=""):
""" Create a content library """
data = self.call_studio('post', URL_LIB_CREATE, json={
"org": org_id,
"collection_uuid": str(collection_uuid),
"slug": slug,
"title": title,
"description": description,
"allow_public_learning": True,
"allow_public_read": False,
}, edx_endpoint=URL_LIB_CREATE)
return _deserialize_with(serializers.ContentLibraryMetadataSerializer, data)
def delete_library(self, lib_key):
""" Delete an existing content library """
self.call_studio('delete', URL_LIB_DETAIL.format(lib_key=lib_key), edx_endpoint=URL_LIB_DETAIL)
def get_library_links(self, lib_key):
""" Get the links in this content library """
links = self.call_studio('get', URL_LIB_LINKS.format(lib_key=lib_key), edx_endpoint=URL_LIB_LINKS)
return [_deserialize_with(serializers.LibraryLinkSerializer, link) for link in links]
def get_library_link(self, lib_key, link_id):
""" Get a single link for the specified content library, or None """
links = self.get_library_links(lib_key)
for link in links:
if link.id == link_id:
return link
return None
def create_library_link(self, lib_key, link_id: str, target_lib, version=None):
"""
Modify the library 'libraryId' to include a new link to the specified
library. Will fail if a link with the same name already exists.
Can optionally link to a specific version of the target library.
"""
return self.call_studio('post', URL_LIB_LINKS.format(lib_key=lib_key), json={
"id": link_id,
"opaque_key": str(target_lib),
"version": version,
}, edx_endpoint=URL_LIB_LINKS)
def update_library_link(self, lib_key, link_id, version=None):
"""
Change the version of an existing library link. Set version=None to use
the latest version.
"""
self.call_studio('patch', URL_LIB_LINK.format(lib_key=lib_key, link_id=link_id), json={
"version": version,
}, edx_endpoint=URL_LIB_LINK)
def delete_library_link(self, lib_key, link_id):
"""
Delete a link from the specified library.
"""
self.call_studio('delete', URL_LIB_LINK.format(lib_key=lib_key, link_id=link_id), edx_endpoint=URL_LIB_LINK)
def get_library_block_olx(self, block_key):
""" Get the OLX of a specific block in a library """
data = self.call_studio('get', URL_LIB_BLOCK_OLX.format(block_key=block_key), edx_endpoint=URL_LIB_BLOCK_OLX)
return data["olx"]
def set_library_block_olx(self, block_key, new_olx):
""" Overwrite the OLX of a specific block in the library """
return self.call_studio(
'post',
URL_LIB_BLOCK_OLX.format(block_key=block_key),
json={"olx": new_olx},
edx_endpoint=URL_LIB_BLOCK_OLX,
)
def commit_library_changes(self, lib_key):
""" Commit changes to an existing library """
return self.call_studio('post', URL_LIB_COMMIT.format(lib_key=lib_key), edx_endpoint=URL_LIB_COMMIT)
def revert_library_changes(self, lib_key):
""" Revert pending changes to an existing library """
self.call_studio('delete', URL_LIB_COMMIT.format(lib_key=lib_key), edx_endpoint=URL_LIB_COMMIT)
def add_block_to_library(self, lib_key, block_type, slug, parent_block=None):
""" Add a new XBlock to the library """
block_info = {"block_type": block_type, "definition_id": slug}
if parent_block:
block_info["parent_block"] = str(parent_block)
data = self.call_studio(
'post',
URL_LIB_BLOCKS.format(lib_key=lib_key),
json=block_info,
edx_endpoint=URL_LIB_BLOCKS,
)
return _deserialize_with(serializers.LibraryXBlockMetadataSerializer, data)
def get_library_block(self, block_key):
""" Get a specific block in the library """
data = self.call_studio('get', URL_LIB_BLOCK.format(block_key=block_key), edx_endpoint=URL_LIB_BLOCK)
return _deserialize_with(serializers.LibraryXBlockMetadataSerializer, data)
def get_library_block_or_none(self, block_key):
""" Get a specific block in the library, or None if it does not exist """
try:
return self.get_library_block(block_key)
except requests.HTTPError as err:
if err.response.status_code == 404:
return None
raise
def delete_library_block(self, block_key):
""" Delete a specific block from the library """
self.call_studio('delete', URL_LIB_BLOCK.format(block_key=block_key), edx_endpoint=URL_LIB_BLOCK)
def get_library_blocks(self, lib_key):
""" Get the list of XBlocks in the library """
data = self.call_studio('get', URL_LIB_BLOCKS.format(lib_key=lib_key), edx_endpoint=URL_LIB_BLOCKS)
return [_deserialize_with(serializers.LibraryXBlockMetadataSerializer, block) for block in data]
def get_editable_child_blocks_of_library_block(self, block_key: LibraryUsageLocatorV2):
"""
Get a list of block IDs of child XBlocks that are in the same library.
This excludes linked XBlocks from other bundles, because those are not
editable.
"""
metadata = self.get_xblock_metadata(block_key, extra_fields=['editable_children'])
return [UsageKey.from_string(k) for k in metadata.editable_children]
def get_library_block_assets(self, block_key):
"""
Get a list of the static asset files belonging to the specified XBlock.
"""
url = URL_LIB_BLOCK_ASSETS.format(block_key=block_key)
files = self.call_studio('get', url, edx_endpoint=URL_LIB_BLOCK_ASSETS)["files"]
return [_deserialize_with(serializers.LibraryXBlockAssetFileSerializer, f) for f in files]
def get_library_block_asset_file(self, block_key, filename, else_none=False):
"""
Get metadata about a static asset file belonging to the specified XBlock.
"""
url = URL_LIB_BLOCK_ASSET.format(block_key=block_key, filename=filename)
response = self.call_studio_raw('get', url)
if response.status_code == 404 and else_none:
return None
response.raise_for_status()
return _deserialize_with(serializers.LibraryXBlockAssetFileSerializer, response.json())
def set_library_block_asset_file(self, block_key, filename, content):
"""
Upload/replace the static asset file with the given name belonging to
the specified XBlock.
"""
url = URL_LIB_BLOCK_ASSET.format(block_key=block_key, filename=filename)
data = self.call_studio('put', url, files={"content": content}, edx_endpoint=URL_LIB_BLOCK_ASSET)
return _deserialize_with(serializers.LibraryXBlockAssetFileSerializer, data)
# XBlock API ###############################################################
def render_block_view(self, block_key, view_name):
"""
Render an XBlock's view via the LMS.
"""
url = URL_BLOCK_RENDER_VIEW.format(block_key=block_key, view_name=view_name)
return self.call_lms('get', url, edx_endpoint=URL_BLOCK_RENDER_VIEW)
def render_block_view_studio(self, block_key, view_name):
"""
Render an XBlock's view via Studio, which shows the draft version of the
block.
"""
url = URL_BLOCK_RENDER_VIEW.format(block_key=block_key, view_name=view_name)
return self.call_studio('get', url, edx_endpoint=URL_BLOCK_RENDER_VIEW)
def get_xblock_metadata(self, block_key, extra_fields=None):
"""
Get the metadata for any XBlock that uses the new Blockstore-based
XBlock runtime.
"""
url = URL_BLOCK_METADATA.format(block_key=block_key)
data = self.call_lms(
'get',
url,
params={"include": ','.join(extra_fields or [])},
edx_endpoint=URL_BLOCK_METADATA,
)
# This is a hack to continue working with the upstream api before
# https://github.com/edx/edx-platform/pull/23246 is deployed.
if "index_dictionary" not in data:
data["index_dictionary"] = {}
return _deserialize_with(serializers.XBlockMetadataSerializer, data)
def get_xblock_children(self, block_key: UsageKey):
"""
Get a list of block IDs of child XBlocks.
"""
metadata = self.get_xblock_metadata(block_key, extra_fields=['children'])
return [UsageKey.from_string(k) for k in metadata.children]
def get_block_handler_url(self, block_key, handler_name, use_cached_handler_url=False):
"""
Get the URL to call a specific XBlock's handler via the LMS.
"""
url = URL_BLOCK_GET_HANDLER_URL.format(block_key=block_key, handler_name=handler_name)
if not hasattr(self, 'user'):
# Normal version - just query the LMS and return the result.
# This is the only version that will work for anonymous users.
return self.call_lms('get', url, edx_endpoint=URL_BLOCK_GET_HANDLER_URL)["handler_url"]
# Version with caching:
cache_key = BLOCK_GET_HANDLER_URL_CACHE_KEY.format(username=self.user.username, url=url) # pylint: disable=no-member
handler_url = cache.get(cache_key)
if use_cached_handler_url and handler_url:
return handler_url
handler_url = self.call_lms('get', url, edx_endpoint=URL_BLOCK_GET_HANDLER_URL)["handler_url"]
cache.set(cache_key, handler_url, 60 * 60)
return handler_url
def get_block_handler_url_studio(self, block_key, handler_name):
"""
Get the URL to call a specific XBlock's handler via Studio.
"""
url = URL_BLOCK_GET_HANDLER_URL.format(block_key=block_key, handler_name=handler_name)
return self.call_studio('get', url, edx_endpoint=URL_BLOCK_GET_HANDLER_URL)["handler_url"]
def call_block_handler(
self, block_key, handler_name, method='get', json_response=True,
suffix=None, use_cached_handler_url=False, **kwargs
):
"""
Call an XBlock handler (in the LMS) and get the data it returns.
You can pass in any parameters that 'requests' accepts.
"""
handler_url = self.get_block_handler_url(block_key, handler_name, use_cached_handler_url)
# Hack: On devstacks, the LMS will return handler_urls that the user can
# access via their browser, but we need a URL that works over the docker
# network. This doesn't affect prod. (Is there a better way to deal with this?)
handler_url = handler_url.replace(settings.LMS_ROOT_PUBLIC, settings.LMS_ROOT)
if suffix:
handler_url += suffix
response = self.session.request(method, handler_url, **kwargs)
if json_response:
return response.json()
return response
def call_block_handler_studio(
self, block_key, handler_name, method='get', json_response=True, suffix=None, **kwargs,
):
"""
Call an XBlock handler (in Studio/draft mode) and get the data it returns.
You can pass in any parameters that 'requests' accepts.
"""
# we can still get the url information from the LMS
handler_url = self.get_block_handler_url(block_key, handler_name)
handler_url = handler_url.replace(settings.LMS_ROOT_PUBLIC, settings.CMS_ROOT)
if suffix:
handler_url += '/' + suffix
response = self.session.request(method, handler_url, **kwargs)
if json_response:
return response.json()
return response
# Miscellaneous ############################################################
def get_modulestore_block_olx_data(self, block_key):
"""
Use the Open edX OLX REST API plugin to get the OLX of the specified
XBlock.
See https://github.com/open-craft/openedx-olx-rest-api
"""
url = URL_MODULESTORE_BLOCK_OLX.format(block_key=block_key)
return self.call_studio('get', url, edx_endpoint=URL_MODULESTORE_BLOCK_OLX)
def get_courses(self, org=None):
"""
Get the full list of courses visible to the requesting user. Anonymous usage is not supported.
This can be an expensive call, as it may return hundreds of courses, so cache it.
"""
cache_key = COURSE_LIST_CACHE_KEY.format(username=self.user.username, org=org if org else "all") # pylint: disable=no-member
course_list = cache.get(cache_key)
if course_list:
return course_list
def _get_courses(url, params=None):
""" Recursively load all pages of course data. """
data = self.call_lms("get", url, params=params, edx_endpoint=URL_COURSES_LIST)
results = data.get("results", [])
next_page_url = data.get("pagination", {}).get("next")
if next_page_url:
next_page = urlparse(next_page_url)
results += _get_courses(next_page.path, parse_qs(next_page.query))
return results
# Fetch using the API's maximum page size to minimize the risk of throttling.
params = {"page_size": 100}
if org:
params["org"] = org
course_list = _get_courses(URL_COURSES_LIST, params)
cache.set(cache_key, course_list, 60 * 60)
return course_list
def get_course_blocks(self, course_id, block_types=None, requested_fields=None):
"""
Get the list of course access roles for the requesting user.
"""
params = {
"course_id": course_id,
"all_blocks": True,
"depth": "all",
"return_type": "dict",
}
if block_types:
params["block_types_filter"] = block_types
if requested_fields:
params["requested_fields"] = requested_fields
response = self.call_lms("get", URL_COURSES_BLOCKS, params=params, edx_endpoint=URL_COURSES_BLOCKS)
return response.get("blocks")
def get_course_access_roles(self, course_id=None):
"""
Get the list of course access roles for the requesting user.
"""
params = None
if course_id:
params = {"course_id": course_id}
return self.call_lms("get", URL_ENROLLMENT_ROLES, params=params, edx_endpoint=URL_ENROLLMENT_ROLES)
class EdxAppUserClient(EdxAppClient):
"""
API client for making API requests using a user's account.
This only works for registered users who logged in using SSO via
https://github.com/edx/auth-backends
"""
def __init__(self, user):
"""
Initialize this API client
"""
self.user = user
super().__init__(
lms_url=settings.LMS_ROOT,
studio_url=settings.CMS_ROOT,
oauth_key=None,
oauth_secret=None,
) # Don't pass any params to the parent
def init_session(self):
"""
Initialize the HTTP session that this client will use.
"""
if self.user.is_anonymous:
raise Exception("EdxAppUserClient does not support anonymous users. Use EdxAppAnonymousUserClient.")
try:
user_social_auth = self.user.social_auth.get(provider='edx-oauth2')
except UserSocialAuth.DoesNotExist:
raise Exception("The user did not log in with SSO, so we can't call the Open edX API on their behalf")
user_access_token = user_social_auth.extra_data['access_token']
session = requests.Session()
# When the user first logs in via SSO, they will get a JWT access token.
# But when we use the refresh_token to refresh it, we'll get a normal OAuth2
# access token back.
if '.' in user_access_token:
session.headers.update({"Authorization": f"JWT {user_access_token}"})
else:
session.headers.update({"Authorization": f"Bearer {user_access_token}"})
# We want the LMS to take language from request and return translated content accordingly
language = self.user.profile.preferred_language
_update_session_language_header(session, language)
return session, False
def refresh_session_token(self):
"""
Refreshes the authenticated session with a new token.
"""
expired_access_token = self.user.social_auth.get(provider='edx-oauth2').extra_data.get('access_token')
with transaction.atomic(using='default'): # Need to specify 'default' or the transaction doesn't work
# Now, acquire a write lock on the user's social_auth entry.
# This will pause at this point if another thread is also refreshing the token, which is quite likely
# due to our application's use of parallel API requests.
user_social_auth = self.user.social_auth.using('default').select_for_update().get(provider='edx-oauth2')
user_access_token = user_social_auth.extra_data.get('access_token')
if user_access_token != expired_access_token:
log.debug("Another backend process already refreshed the token while we waited for the row to unlock")
else:
log.info(f"Updating access_token and refresh_token for {self.user.username}")
response = requests.post(self.token_url, data={
'client_id': settings.SOCIAL_AUTH_EDX_OAUTH2_KEY,
'client_secret': settings.SOCIAL_AUTH_EDX_OAUTH2_SECRET,
'grant_type': 'refresh_token',
'refresh_token': user_social_auth.extra_data['refresh_token'],
})
response.raise_for_status()
new_token_data = response.json()
user_access_token = new_token_data['access_token']
user_social_auth.extra_data['refresh_token'] = new_token_data['refresh_token']
user_social_auth.extra_data['access_token'] = user_access_token
user_social_auth.save(using='default', update_fields=['extra_data'])
self.session.headers.update({"Authorization": f"Bearer {user_access_token}"})
class EdxAppAnonymousUserClient(EdxAppClient):
"""
API client for connecting to Open edX using a user who hasn't registered an
account.
Unlike the other EdXAppClient classes, this one authenticates using session
cookies, not OAuth (because anonymous users can't use OAuth but do have
sessions)
"""
def __init__(self, request):
"""
Initialize this Studio API client
"""
if not request.user.is_anonymous:
raise ValueError("EdxAppAnonymousUserClient only works for anonymous (non-registered) users.")
self.request = request
super().__init__(lms_url=settings.LMS_ROOT, studio_url=settings.CMS_ROOT, oauth_key=None, oauth_secret=None)
def init_session(self):
"""
Initialize the HTTP session that this client will use.
Note there are two sessions here:
(1) a requests.Session HTTP session between this backend and edxapp
(2) the current anonymous user's django session, in which we store the
session cookie required to authenticate the (1) requests.Session.
"""
session = requests.Session()
# Has this user already made an LMS/Studio request?
if 'edxapp_session_cookies' in self.request.session:
# The cookies (which we've stored in this user's django session) should be valid - use them as-is
session.cookies.update(self.request.session['edxapp_session_cookies'])
return session, False
def refresh_session_token(self):
"""
Refreshes the authenticated session credentials
"""
# call_lms_raw / call_studio_raw will update the session cookies on every request; no need to do that here
def call_lms_raw(self, method, path, **kwargs):
"""
Make an LMS API call and return the HTTP response.
"""
response = super().call_lms_raw(method, path, **kwargs)
# Sometimes the session cookie will change, so we need to save the latest version so it doesn't expire:
# We can't just do 'self.request.session['edxapp_session_cookies'] = dict(self.session.cookies)'
# because the LMS sends back a 'csrftoken' cookie with two different domains, leading to a CookieConflictError
cookies_to_save = {key: value for (key, value) in self.session.cookies.items() if 'session' in key}
self.request.session['edxapp_session_cookies'] = cookies_to_save
return response
def call_studio_raw(self, method, path, **kwargs):
"""
Make a Studio API call and return the HTTP response.
"""
response = super().call_studio_raw(method, path, **kwargs)
cookies_to_save = {key: value for (key, value) in self.session.cookies.items() if 'session' in key}
self.request.session['edxapp_session_cookies'] = cookies_to_save
return response
def default_openedx_client() -> EdxAppClient:
"""
Get the default client, which connects to Open edX as the service user.
This method has a side effect - it will make an HTTP request to the LMS the
first time it is called.
"""
if not hasattr(default_openedx_client, "client"):
default_openedx_client.client = EdxAppClient(
lms_url=settings.LMS_ROOT,
studio_url=settings.CMS_ROOT,
oauth_key=settings.LMS_API_AUTH_KEY,
oauth_secret=settings.LMS_API_AUTH_SECRET,
)
return default_openedx_client.client
def openedx_client_for_request(request) -> EdxAppClient:
"""
Given a django HTTP request, return an Open edX client instance for the
request's user, whether they are a registered or anonymous user.
"""
if request.user.is_anonymous:
return EdxAppAnonymousUserClient(request)
else:
return EdxAppUserClient(request.user)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment