Last active
May 16, 2023 06:20
-
-
Save daigles/ff958b8b3ed695329d371e5d500acb0a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import io | |
import os | |
import requests | |
import shutil | |
import time | |
class ShotgunMediaIO: | |
# Note: chunk_size must be greater than 5,000,000 | |
# This is the byte size at which file uploads will switch over to mulitpart uploads. | |
chunk_size = 10000000 | |
@staticmethod | |
def is_transient(url): | |
if not url: | |
return False | |
return "/images/status/transient/" in url | |
# Creates a MediaIO object. | |
# @param host [String] base URL of the Shotgun site | |
# @param credentials [Hash] credentials to use to access the site's API endpoint | |
# @option credentials [String] 'login' User login. To be used with 'password' | |
# @option credentials [String] 'password' user password | |
# @option credentials [String] 'client_id' api script name. To be used with 'client_secret' | |
# @option credentials [String] 'client_secret' api script secret | |
def __init__(self, host, credentials, debug = False): | |
self.host = host | |
self.credentials = credentials | |
self.access_token = None | |
self.refresh_token = None | |
self.access_expires_at = None | |
self.refresh_expires_at = None | |
self.debug = debug | |
# Upload a file to the Shotgun site and creates an Attachment entity associated to the given entity. | |
# @param entity_type [String] type of the entity the media will be attached to | |
# @param record_id [number] id of the entity the media will be attached to | |
# @param field [String] field of the entity the media will be attached to. This can be a URL field or None. If None, | |
# the media is uploaded and the File (Attachment) entity is linked to the given entity. | |
# @param local_file_path [String] The path to the file to upload. | |
# @param content_type [String] MIME type of the file to upload. This will be associated to the uploaded file. | |
def upload(self, entity_type, record_id, field, local_file_path, content_type): | |
filename = os.path.basename(local_file_path) | |
fd = open(local_file_path, "rb") | |
fsize = os.path.getsize(local_file_path) | |
success = False | |
if fsize <= self.chunk_size: | |
success = self.__upload(entity_type, record_id, field, filename, fd, content_type) | |
else: | |
success = self.__upload_multipart(entity_type, record_id, field, filename, fd, content_type) | |
fd.close() | |
return success | |
# Gets a signed URL that can be used to download a media file. | |
# @param entity_type [String] type of the entity the media is attached to | |
# @param record_id [number] id of the entity the media is attached to | |
# @param field [String] field of the entity the media is attached to | |
def get_download_url(self, entity_type, record_id, field="image"): | |
headers = self.__setup_auth_headers() | |
headers["Accept"] = "application/json" | |
url = "%s/api/v1/entity/%s/%s/%s" % (self.host, entity_type, record_id, field) | |
result = None | |
try: | |
resp = requests.get(url, headers=headers) | |
resp.raise_for_status() | |
result = resp.json()["data"] | |
if isinstance(result, dict): | |
result = result["url"] | |
except requests.exceptions.RequestException as e: | |
self.__debug("Could not get the download url: %s" % e) | |
if not result: | |
result = "" | |
return result | |
# Download a media file. | |
# @param entity_type [String] type of the entity the media is attached to | |
# @param record_id [number] id of the entity the media is attached to | |
# @param field [String] field of the entity the media is attached to | |
# @param local_file_path [String] local file path that will contain the media | |
def download(self, entity_type, record_id, field, local_file_path): | |
url = self.get_download_url(entity_type, record_id, field) | |
if not url: | |
return None | |
result = False | |
try: | |
with requests.get(url, stream=True) as resp: | |
resp.raise_for_status() | |
with open(local_file_path, "wb") as fd: | |
shutil.copyfileobj(resp.raw, fd) | |
result = True | |
except requests.exceptions.RequestException as e: | |
self.__debug("Could not download: %s" % e) | |
return result | |
# Downloads the thumbnail associated to the given entity. | |
# @param entity_type [String] type of the entity the media is attached to | |
# @param record_id [number] id of the entity the media is attached to | |
# @param size [String] 'original' or 'thumbnail'. | |
# @param local_file_path [String] local path where to download the file | |
def download_thumbnail(self, entity_type, record_id, size, local_file_path): | |
headers = self.__setup_auth_headers() | |
headers["Accept"] = "application/json" | |
url = "%s/api/v1/entity/%s/%s/image?alt=%s"%(self.host, entity_type, record_id, size) | |
result = False | |
try: | |
resp = requests.get(url, allow_redirects=True, headers=headers) | |
open(local_file_path, "wb").write(resp.content) | |
result = True | |
except requests.exceptions.RequestException as e: | |
self.__debug("Could not download thumbnail: %s" % e) | |
return result | |
# Private | |
def __upload(self, entity_type, record_id, field, filename, fd, content_type): | |
# Get the upload info from SG (including the target URL) | |
upload_metadata = self.__get_upload_metadata(filename, entity_type, record_id, field) | |
if not upload_metadata: | |
return False | |
# Upload the data to the received target URL | |
upload_link = upload_metadata["links"]["upload"] | |
upload_response = self.__upload_data(upload_link, content_type, fd) | |
if not upload_response: | |
return False | |
# Complete the upload process | |
result = self.__complete_upload(upload_response, upload_metadata, filename) | |
if not result: | |
self.__debug("Could not complete the upload... aborting") | |
return False | |
return True | |
def __upload_multipart(self, entity_type, record_id, field, filename, fd, content_type): | |
# Get the upload info from SG (including the target URL for the first part to upload) | |
upload_metadata = self.__get_upload_metadata(filename, entity_type, record_id, field, multipart_upload=True) | |
if not upload_metadata: | |
self.__debug("Multipart uploads not supported for this storage type... (this is only supported for uploads to S3)") | |
return False | |
# Upload the data to the received target URL | |
upload_link = upload_metadata["links"]["upload"] | |
next_part_link = upload_metadata["links"]["get_next_part"] | |
etags = [] | |
while True: | |
upload_response = self.__upload_data(upload_link, content_type, fd, self.chunk_size) | |
if not upload_response: | |
break | |
etags.append(upload_response["e_tag"]) | |
if upload_response["nb_bytes_uploaded"] < self.chunk_size: | |
break | |
upload_part_metadata = self.__get_next_upload_part_metadata(next_part_link) | |
upload_link = upload_part_metadata["links"]["upload"] | |
next_part_link = upload_part_metadata["links"]["get_next_part"] | |
if not upload_response: | |
self.__debug("Could not upload data... aborting") | |
self.__abort_multipart_upload(upload_metadata) | |
return False | |
# Complete the upload process | |
upload_metadata["data"]["etags"] = etags | |
result = self.__complete_upload(upload_response, upload_metadata, filename) | |
if not result: | |
self.__debug("Could not complete the upload... aborting") | |
self.__abort_multipart_upload(upload_metadata) | |
return False | |
return True | |
def __get_upload_metadata(self, filename, entity_type, record_id, field, multipart_upload=False): | |
headers = self.__setup_auth_headers() | |
headers["Accept"] = "application/json" | |
params = {"filename": filename} | |
if multipart_upload: | |
params["multipart_upload"] = True | |
if field: | |
url = "%s/api/v1/entity/%s/%s/%s/_upload"% (self.host, entity_type, record_id, field) | |
else: | |
url = "%s/api/v1/entity/%s/%s/_upload"% (self.host, entity_type, record_id) | |
result = None | |
try: | |
resp = requests.get(url, params=params, headers=headers) | |
resp.raise_for_status() | |
result = resp.json() | |
except requests.exceptions.RequestException as e: | |
self.__debug("Could not get upload metadata. Aborting...: %s" % e) | |
return result | |
def __upload_data(self, upload_link, content_type, fd, max_bytes=None): | |
headers = {"Content-Type": content_type} | |
fd_temp = fd | |
if max_bytes: | |
fd_temp = io.BytesIO() | |
fd_temp.write(fd.read(max_bytes)) | |
fd_temp.seek(0) | |
result = None | |
for i in range(5): | |
self.__debug("Uploading to: %s (nb_tries: %s)" % (upload_link, i+1)) | |
try: | |
resp = requests.put(upload_link, data=fd_temp, headers=headers) | |
resp.raise_for_status() | |
result = resp.json() if resp.text else {"data": {}} | |
result["nb_bytes_uploaded"] = int(resp.request.headers["Content-Length"]) | |
result["e_tag"] = resp.headers["ETag"] | |
break | |
except requests.exceptions.RequestException as e: | |
self.__debug("upload_data: Unexpected result: %s" % e) | |
fd_temp.seek(0) | |
print "Could not upload data. Retrying in 3 seconds..." | |
time.sleep(3) | |
return result | |
def __get_next_upload_part_metadata(self, get_next_part_url): | |
headers = self.__setup_auth_headers() | |
headers["Accept"] = "application/json" | |
url = self.host + get_next_part_url | |
result = None | |
try: | |
resp = requests.get(url, headers=headers) | |
resp.raise_for_status() | |
result = resp.json() | |
except requests.exceptions.RequestException as e: | |
self.__debug("get_next_upload_part_metadata: Unexpected result: %s" % e) | |
return result | |
def __complete_upload(self, upload_response, upload_metadata, filename): | |
complete_upload_info = upload_metadata["data"] | |
if upload_metadata["data"]["storage_service"] == "sg": | |
complete_upload_info["upload_id"] = upload_response["data"]["upload_id"] | |
complete_upload_data = { | |
"upload_info": complete_upload_info, | |
"upload_data": { | |
"display_name": filename | |
} | |
} | |
complete_upload_link = self.host + upload_metadata["links"]["complete_upload"] | |
headers = self.__setup_auth_headers() | |
headers["Content-Type"] = "application/json" | |
headers["Accept"] = "application/json" | |
result = False | |
try: | |
resp = requests.post(complete_upload_link, json=complete_upload_data, headers=headers) | |
resp.raise_for_status() | |
result = True | |
except requests.exceptions.RequestException as e: | |
self.__debug("Could not complete the upload: %s" % e) | |
return result | |
def __abort_multipart_upload(self, upload_metadata): | |
upload_info = upload_metadata["data"] | |
headers = self.__setup_auth_headers() | |
headers["Content-Type"] = "application/json" | |
headers["Accept"] = "application/json" | |
url = "%s%s/multipart_abort" % (self.host, upload_metadata["links"]["complete_upload"]) | |
result = False | |
try: | |
resp = requests.post(url, headers=headers) | |
resp.raise_for_status() | |
result = True | |
except requests.exceptions.RequestException as e: | |
self.__debug("Could not abort the upload: %s" % e) | |
return result | |
def __setup_auth_headers(self): | |
if not self.access_expires_at: | |
self.__get_access_token() | |
elif self.access_expires_at <= datetime.datetime.now(): | |
if self.__refresh_expires_at <= datetime.datetime.now(): | |
self.__get_access_token() | |
else: | |
self.__get_access_token(grant_type='refresh') | |
return { 'Authorization': self.access_token } | |
def __get_access_token(self, grant_type = None): | |
self.access_token = None | |
self.refresh_token = None | |
self.access_expires_at = None | |
self.refresh_expires_at = None | |
headers = { | |
"Content-Type": "application/x-www-form-urlencoded", | |
"Accept": "application/json" | |
} | |
params = None | |
if grant_type == 'refresh': | |
params = { | |
"grant_type": "refresh", | |
"refresh_token": self.refresh_token | |
} | |
if ("login" in self.credentials) and ("password" in self.credentials): | |
params={ | |
"username": self.credentials["login"], | |
"password": self.credentials["password"], | |
"grant_type": "password" | |
} | |
elif ("client_id" in self.credentials) and ("client_secret" in self.credentials): | |
params={ | |
"client_id": self.credentials["client_id"], | |
"client_secret": self.credentials["client_secret"], | |
"grant_type": "client_credentials" | |
} | |
else: | |
self.__debug("credentials format not supported") | |
return None | |
result = None | |
try: | |
resp = requests.post("%s/api/v1/auth/access_token" % self.host, params=params, headers=headers) | |
resp.raise_for_status() | |
result = resp.json() | |
self.access_token = "%s %s"%(result["token_type"], result["access_token"]) | |
self.refresh_token = result['refresh_token'] | |
self.access_expires_at = datetime.datetime.now() + datetime.timedelta(0,result['expires_in']) | |
self.refresh_expires_at = datetime.datetime.now() + datetime.timedelta(0,24*60*60) | |
except requests.exceptions.RequestException as e: | |
self.__debug("Cannot get access token. Unexpected result: %s" % e) | |
return result | |
def __debug(self, *args): | |
if not self.debug: | |
return | |
print(" DEBUG: " + " ".join(map(str,args))) | |
if __name__ == '__main__': | |
######################### | |
host = "https://yoursite.shotgunstudio.com" | |
login = "XXXXX" | |
password = "XXXXX" | |
sg_media_io = ShotgunMediaIO(host, {"login":login, "password": password}, debug = True) | |
########################### | |
target_entity = "Version" | |
target_record_id = 712 | |
print "==============================================" | |
print "URL field workflow" | |
print "---------------------------------------" | |
print "Try to get the download URL of an empty link" | |
download_url = sg_media_io.get_download_url(target_entity, target_record_id, "sg_uploaded_movie") | |
print "download url: %s" % download_url | |
print "---------------------------------------" | |
print "Try to download an empty link" | |
result = sg_media_io.download(target_entity, target_record_id, "sg_uploaded_movie", "./downloaded_Sourissimo.mp4" ) | |
print "result: %s" % result | |
print "---------------------------------------" | |
print "Upload data" | |
result = sg_media_io.upload(target_entity, target_record_id, "sg_uploaded_movie", "./Sourissimo.mp4", "video/mp4" ) | |
print "result: %s" % result | |
print "---------------------------------------" | |
print "Download url for uploaded media" | |
download_url = sg_media_io.get_download_url(target_entity, target_record_id, "sg_uploaded_movie") | |
print("download url: %s" % download_url) | |
print "---------------------------------------" | |
print "Download media" | |
result = sg_media_io.download(target_entity, target_record_id, "sg_uploaded_movie", "./downloaded_Sourissimo.mp4" ) | |
print "result: %s" % result | |
print "---------------------------------------" | |
print "==============================================" | |
print "Linked media workflow" | |
print "No field name: The file will be uploaded as a File (Attachment) entity linked to the given entity" | |
result = sg_media_io.upload(target_entity, target_record_id, None, "./sourissimo.jpeg", "image/jpeg") | |
print "result: %s" % result | |
print "---------------------------------------" | |
print "==============================================" | |
print "Thumbnail workflow" | |
print "Try download (small) thumbnail associated to an entity" | |
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "thumbnail", "./downloaded_small_Sourissimo.jpg" ) | |
print "result: %s" % result | |
print "Try download (original) thumbnail associated to an entity" | |
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "original", "./downloaded_large_Sourissimo.jpg" ) | |
print "result: %s" % result | |
print "---------------------------------------" | |
print "Try to get the download url for the small thumbnail for the entity" | |
result = download_url = sg_media_io.get_download_url(target_entity, target_record_id) | |
print("download url: #{download_url} (transient? #{Shotgun::MediaIO.is_transient? download_url})") | |
print "---------------------------------------" | |
print "Uploading to an image field: This will associate the thumbnail to the given entity" | |
# Note: Once the upload is completed, a process to generate a "small thumbnail" will automatically be started by Shotgun. | |
# That small thumbnail should be available to use within a few minutes... | |
result = sg_media_io.upload(target_entity, target_record_id, "image", "./sourissimo.jpeg", "image/jpeg" ) | |
print "result: %s" % result | |
print "---------------------------------------" | |
print "Download url for the small thumbnail for the entity" | |
download_url = sg_media_io.get_download_url(target_entity, target_record_id, "image") | |
print( "download url: %s (transient? %s)"%(download_url, sg_media_io.is_transient(download_url))) | |
print "---------------------------------------" | |
print "Download url for the small thumbnail for the entity" | |
download_url = sg_media_io.get_download_url(target_entity, target_record_id) | |
print( "download url: %s (transient? %s)"%(download_url, sg_media_io.is_transient(download_url))) | |
print "---------------------------------------" | |
print "download (small) thumbnail uploaded for that entity" | |
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "thumbnail", "./downloaded_small_Sourissimo.jpg" ) | |
print "result: %s" % result | |
print "download (original) thumbnail uploaded for that entity" | |
result = sg_media_io.download_thumbnail(target_entity, target_record_id, "original", "./downloaded_large_Sourissimo.jpg" ) | |
print "result: %s" % result | |
print "---------------------------------------" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment