Last active
August 12, 2021 19:17
-
-
Save awachat/751bf6a354ea414eb48f85af9780b270 to your computer and use it in GitHub Desktop.
Linkedin client to get firstName, lastName, profilePicture, emailAddress from OAuth2.0 access token using Linkedin 2.0 APIs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LinkedinClient(object): | |
""" | |
Usage: | |
client = LinkedinClient(access_token) | |
user_details = client.get_user_details() | |
""" | |
FIELD_SELECTORS = ['id', 'firstName', 'lastName', | |
'profilePicture(displayImage~:playableStreams)', | |
'emailAddress'] | |
USER_DETAILS_URL = 'https://api.linkedin.com/v2/me' \ | |
'?projection=({projection})' | |
USER_EMAILS_URL = 'https://api.linkedin.com/v2/emailAddress' \ | |
'?q=members&projection=(elements*(handle~))' | |
PROFILE_PICTURE_WIDTH = 200 | |
PROFILE_PICTURE_HEIGHT = 200 | |
def __init__(self, access_token): | |
self.access_token = access_token | |
def user_details_url(self): | |
# use set() since LinkedIn fails when values are duplicated | |
fields_selectors = list( | |
set(['id', 'firstName', 'lastName'] + self.FIELD_SELECTORS or [])) | |
fields_selectors = ','.join(fields_selectors) | |
return self.USER_DETAILS_URL.format(projection=fields_selectors) | |
def user_emails_url(self): | |
return self.USER_EMAILS_URL | |
def get_headers(self): | |
return { | |
'Authorization': 'Bearer {access_token}'.format( | |
access_token=self.access_token) | |
} | |
def get_user_details(self): | |
""" | |
Get all the details of the Linkedin User | |
""" | |
user_details = self.get_json(self.user_details_url(), | |
headers=self.get_headers()) | |
self.process_error(user_details) | |
if 'emailAddress' in set(self.FIELD_SELECTORS or []): | |
user_details['emailAddress'] = self._get_email() | |
if 'firstName' in user_details.keys(): | |
user_details['firstName'] = self._get_name( | |
user_details['firstName']) | |
if 'lastName' in user_details.keys(): | |
user_details['lastName'] = self._get_name(user_details['lastName']) | |
if 'profilePicture' in user_details.keys(): | |
user_details['profilePicture'] = self._get_profile_picture( | |
user_details['profilePicture']) | |
return user_details | |
def get_emails(self): | |
""" | |
Get the list of all the emails associated with Linkedin Account | |
""" | |
response = self.get_json(self.user_emails_url(), | |
headers=self.get_headers()) | |
self.process_error(response) | |
email_addresses = [] | |
for element in response.get('elements', []): | |
email_address = element.get('handle~', {}).get('emailAddress') | |
email_addresses.append(email_address) | |
return list(filter(None, email_addresses)) | |
def _get_email(self): | |
""" | |
Get primary/first email address associated with Linkedin Account | |
""" | |
emails = self.get_emails() | |
if emails: | |
return emails[0] | |
def _get_name(self, name): | |
""" | |
FirstName & Last Name object | |
{ | |
"localized":{ | |
"en_US":"Smith" | |
}, | |
"preferredLocale":{ | |
"country":"US", | |
"language":"en" | |
} | |
} | |
:return the localizedName from the lastName object | |
""" | |
locale = "{}_{}".format( | |
name["preferredLocale"]["language"], | |
name["preferredLocale"]["country"] | |
) | |
return name['localized'].get(locale, '') | |
def _get_profile_picture(self, pictures): | |
""" | |
Get the first profile picture that matches the dimensions | |
""" | |
elements = pictures.get('displayImage~', {}).get('elements', []) | |
for element in elements: | |
if element.get('authorizationMethod') == 'PUBLIC': | |
storage_size = element.get('data', {}).get( | |
'com.linkedin.digitalmedia.mediaartifact.StillImage', | |
{}).get('storageSize', {}) | |
if storage_size.get('height') == self.PROFILE_PICTURE_HEIGHT \ | |
and storage_size.get('width') == self.PROFILE_PICTURE_WIDTH: | |
for identifier in element.get('identifiers', []): | |
if identifier.get('identifierType') == 'EXTERNAL_URL': | |
return identifier.get('identifier') | |
def get_json(self, url, *args, **kwargs): | |
return requests.get(url, *args, **kwargs).json() | |
def process_error(self, data): | |
if data.get('serviceErrorCode'): | |
raise LookupError(data.get('message') or data.get('status')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment