Skip to content

Instantly share code, notes, and snippets.

@karolzak
Created March 30, 2021 09:50
Show Gist options
  • Save karolzak/40361036bbbf473695d3959848d9a71b to your computer and use it in GitHub Desktop.
Save karolzak/40361036bbbf473695d3959848d9a71b to your computer and use it in GitHub Desktop.
Azure Video Indexer python client
# Original source code: https://github.com/bklim5/python_video_indexer_lib
import re
import time
import datetime
import requests
def get_retry_after_from_message(message):
match = re.search(r'Try again in (\d+) second', message or '')
if match:
return int(match.group(1))
return 30 # default to retry in 30 seconds
class VideoIndexer():
def __init__(self, vi_subscription_key, vi_location, vi_account_id):
self.vi_subscription_key = vi_subscription_key
self.vi_location = vi_location
self.vi_account_id = vi_account_id
self.access_token = None
self.access_token_timestamp = None
self.video_name_to_id_dict = None
self.get_access_token()
def get_access_token(self):
print('Getting video indexer access token...')
headers = {
'Ocp-Apim-Subscription-Key': self.vi_subscription_key
}
params = {
'allowEdit': 'true'
}
access_token_req = requests.get(
'https://api.videoindexer.ai/auth/{loc}/Accounts/{acc_id}/AccessToken'.format( # NOQA E501
loc=self.vi_location,
acc_id=self.vi_account_id
),
params=params,
headers=headers
)
access_token = access_token_req.text[1:-1]
print('Access Token: {}'.format(access_token))
self.access_token = access_token
self.access_token_timestamp = datetime.datetime.now()
return access_token
def check_access_token(self):
delta = datetime.datetime.now() - self.access_token_timestamp
if delta > datetime.timedelta(minutes=50):
self.get_access_token()
def get_all_videos_list(self):
all_videos_list = []
done = False
skip = 0
page_size = 200
while(not done):
response = self.get_videos_list(page_size=page_size, skip=skip)
all_videos_list.extend(response['results'])
next_page = response['nextPage']
skip = next_page['skip']
page_size = next_page['pageSize']
done = next_page['done']
return all_videos_list
def get_videos_list(self, page_size=25, skip=0):
self.check_access_token()
params = {
'accessToken': self.access_token,
'pageSize': page_size,
'skip': skip
}
print('Getting videos list..')
get_videos_list = requests.get(
'https://api.videoindexer.ai/{loc}/Accounts/{acc_id}/Videos'.format( # NOQA E501
loc=self.vi_location,
acc_id=self.vi_account_id
),
params=params
)
response = get_videos_list.json()
return response
def upload_to_video_indexer(
self, video_url, name,
force_upload_if_exists=False,
video_language='English', streaming_preset='Default',
indexing_preset='Default'
):
self.check_access_token()
if self.video_name_to_id_dict is None:
self.get_video_name_to_id_dict()
if name in self.video_name_to_id_dict.keys():
print("Video with the same name already exists in current Video Indexer account.") # NOQA E501
if not force_upload_if_exists:
return self.video_name_to_id_dict[name]
print("'force_upload_if_exists' set to 'True' so uploading the file anyway.") # NOQA E501
print('Uploading video to video indexer...')
params = {
'streamingPreset': streaming_preset,
'indexingPreset': indexing_preset,
'language': video_language,
'name': name,
'accessToken': self.access_token
}
files = {}
if "http" in video_url.lower():
params['videoUrl'] = video_url
else:
files = {
'file': open(video_url, 'rb')
}
retry_count = 5
while True:
if retry_count < 1:
raise Exception('Retry count exceeded.')
upload_video_req = requests.post(
'https://api.videoindexer.ai/{loc}/Accounts/{acc_id}/Videos'.format( # NOQA E501
loc=self.vi_location,
acc_id=self.vi_account_id
),
params=params,
files=files
)
if upload_video_req.status_code == 200:
break
# hit throttling limit, sleep and retry
if upload_video_req.status_code == 429:
error_resp = upload_video_req.json()
print('Throttling limit hit. Error message: {}'.format(
error_resp.get('message')))
retry_after = get_retry_after_from_message(
error_resp.get('message'))
time.sleep(retry_after + 1)
retry_count -= 1
continue
print('Error uploading video to video indexer: {}'.format(
upload_video_req.json()))
raise Exception('Error uploading video to video indexer')
response = upload_video_req.json()
return response['id']
def get_video_info(self, video_id, video_language='English'):
self.check_access_token()
params = {
'accessToken': self.access_token,
'language': video_language
}
print('Getting video info for: {}'.format(video_id))
get_video_info_req = requests.get(
'https://api.videoindexer.ai/{loc}/Accounts/{acc_id}/Videos/{video_id}/Index'.format( # NOQA E501
loc=self.vi_location,
acc_id=self.vi_account_id,
video_id=video_id
),
params=params
)
response = get_video_info_req.json()
print(response)
if response['state'] == 'Processing':
print('Video still processing, current status: {}'.format(
response['videos'][0]['processingProgress'],
))
return response
def get_scenes_for_video(self, video_id):
info = self.get_video_info(video_id=video_id)
return info['videos'][0]['insights']['scenes']
def get_caption_from_video_indexer(
self, video_id, caption_format='vtt', video_language='English'):
self.check_access_token()
print('Getting caption from video: {}'.format(video_id))
params = {
'accessToken': self.access_token,
'format': caption_format,
'language': video_language
}
caption_req = requests.get(
'https://api.videoindexer.ai/{loc}/Accounts/{acc_id}/Videos/{video_id}/Captions'.format( # NOQA E501
loc=self.vi_location,
acc_id=self.vi_account_id,
video_id=video_id,
),
params=params
)
return caption_req.content
def get_thumbnail_from_video_indexer(self, video_id, thumbnail_id):
print('Getting thumbnail from video: {}, thumbnail: {}'.format(
video_id, thumbnail_id))
params = {
'accessToken': self.access_token
}
thumbnail_req = requests.get(
'https://api.videoindexer.ai/{loc}/Accounts/{acc_id}/Videos/{video_id}/Thumbnails/{thumbnail_id}'.format( # NOQA E501
loc=self.vi_location,
acc_id=self.vi_account_id,
video_id=video_id,
thumbnail_id=thumbnail_id
),
params=params
)
return thumbnail_req.content
def get_video_name_to_id_dict(self):
all_videos = self.get_all_videos_list()
names = [video['name'] for video in all_videos]
ids = [video['id'] for video in all_videos]
self.video_name_to_id_dict = dict(zip(names, ids))
return self.video_name_to_id_dict
def extract_summary_from_video_indexer_info(self, info):
sum_ins = info['summarizedInsights']
return {
'durationInSeconds': info['durationInSeconds'],
'numberOfKeywords': len(
sum_ins.get('keywords', [])),
'keywords': sum_ins.get('keywords', []),
'sumOfWordCount': sum(
sum_ins['statistics']['speakerWordCount'].values()),
'sentimentSeenDurationRatio': {
x['sentimentKey']: x['seenDurationRatio']
for x in sum_ins['sentiments']
},
'sentimentScore': {
x['sentimentType']: x['averageScore']
for x in info['videos'][0]['insights'].get('sentiments', [])
},
'transcript': [
{
'confidence': x['confidence'],
'text': x['text'],
'textLength': len(x['text'].split()),
'confidencePerText': x['confidence'] * len(
x['text'].split())
} for x in info['videos'][0]['insights'].get('transcript', [])
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment