Skip to content

Instantly share code, notes, and snippets.

@portedison
Last active August 29, 2024 04:25
Show Gist options
  • Save portedison/320ef685df5c02f34b2722af69b45a18 to your computer and use it in GitHub Desktop.
Save portedison/320ef685df5c02f34b2722af69b45a18 to your computer and use it in GitHub Desktop.
bru / textures
# This is an example of a fabric texture (for 3d models) download client I created for a Python / Django site,
# 1. when a user requests a texture download, it creates a task using Celery,
# 2. this task creates a RequestTexture item in the database
# 3. if the textures have never been requested before -
# it retrieves the texture images (presigned files that expire) from an external bucket
# 4. once I have selected and pulled them to a temp/ folder I zip them up (with spec sheet pdf)
# 5. optionally (on the server, or locally if USE_AWS_S3_MEDIA is set) I push them onto our bucket
# 6. lastly, I send an email to the download view, which triggers an automatic download
@current_app.task(
rate_limit='24/h',
retry_backoff=True,
max_retries=settings.TASK_MAX_RETRIES_DEFAULT)
def process_texture_request(product_id, user_id, email, verbosity=1):
"""
Sends textures to customers
"""
client = BruTextureClient(verbosity)
# first check if we have any requests that are not resolved
# if so do nothing, it's likely the request was made twice
current_request_textures = RequestTexture.objects.filter(
user_id=user_id,
product_id=product_id,
email=email,
status__in=[RequestTexture.STATUS_PENDING,
RequestTexture.STATUS_PROCESSING]
)
if not current_request_textures:
request_texture = RequestTexture.objects.create(
user_id=user_id,
email=email,
product_id=product_id
)
client.process_customer_request_for_textures(
request_texture=request_texture
)
import time
import datetime
import json
from time import sleep
import zipfile
import shutil
import traceback
import requests
from django.conf import settings
from .utils import log_error, log_info
from django.core.files.base import ContentFile
from django.core.files.storage import FileSystemStorage
from django.core.files.storage import default_storage
from customers.models import RequestTexture
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from project.storage_backends import ExternalDataStorage, PrivateMediaStorage, PublicMediaStorage
from productsheet.pdf import get_pdf
from catalogue.models import Product
PREVENT_POST = getattr(settings, 'DEBUG', False)
MAXIMUM_RETRIES = getattr(settings, "BRU_SAP_API_MAX_RETRIES", 3)
REQUEST_INTERVAL = getattr(settings, "BRU_SAP_API_REQUEST_INTERVAL", 1)
PAGE_SIZE = getattr(settings, "BRU_SAP_API_PAGE_SIZE", 250)
# default_storage gets set to PublicMediaStorage on AWS server
# so we need to explicity request the FileSystemStorage in order
# to save/process the zip on the task/site server, we then push
# the content, to the preferred bucket
os_storage = FileSystemStorage()
s3_storage = None
if getattr(settings, "USE_AWS_S3_MEDIA", False):
s3_storage = PublicMediaStorage()
def build_error_msg_from_dict(error_data):
message = error_data.get('message', {}).get('value', 'An unexpected error occurred')
err_type = error_data.get('status_code', 500)
return '{}: {}'.format(err_type, message)
class BruTextureClient:
"""
Basic API client that handles retrieving payloads from Bru's Texture bucket
Some info:
https://basecamp.com/2214113/projects/3101619/todos/481796808#comment_900927474
These get stored here:
https://s3.console.aws.amazon.com/s3/buckets/media-*
"""
token = getattr(settings, "BRU_TEXTURE_API_TOKEN", None)
if not token:
raise Exception("Please specify BRU_TEXTURE_API_TOKEN in settings.")
base_api_url = getattr(settings, "BRU_TEXTURE_API_URL", None)
if not base_api_url:
raise Exception("Please specify BRU_TEXTURE_API_URL in settings.")
endpoints = {
"textures": "/default/GetJDMaterials",
}
def __init__(self, verbosity=1, fail_silently=True, *args, **kwargs):
self.verbosity = verbosity
self.fail_silently = fail_silently
self.session = requests.Session()
self.paths = {}
super().__init__(*args, **kwargs)
def get_headers(self):
return {
"x-api-key": self.token,
}
def make_request(self, method, url, headers={}, data=None, params=None, retries=1):
params = params or {}
headers.update(self.get_headers())
try:
response = getattr(self.session, method.lower())(
url, params=params, json=data, headers=headers)
response.raise_for_status()
return response
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as e:
if retries == 1:
message = "{} during request to {}.".format(
type(e).__name__, url)
log_info(message)
elif retries > MAXIMUM_RETRIES:
raise e
sleep(REQUEST_INTERVAL)
message = "Polling {} after {} second delay. {} retries "\
"remaining".format(url, REQUEST_INTERVAL, MAXIMUM_RETRIES - retries)
log_info(message)
return self.make_request(method, url, headers, data=data, params=params, retries=retries+1)
except requests.exceptions.HTTPError as e:
response = getattr(e, "response", None)
if response is None:
raise e
if response.status_code in [502, 504, 524]:
if retries == 1:
message = "{} during request to {}.".format(
type(e).__name__, url)
log_info(message)
elif retries > MAXIMUM_RETRIES:
raise e
sleep(REQUEST_INTERVAL)
message = "Retrying {} after {} second delay. {} retries "\
"remaining".format(url, REQUEST_INTERVAL,
MAXIMUM_RETRIES - retries)
log_info(message)
return self.make_request(method, url, headers, data=data, params=params, retries=retries+1)
raise e
def request_endpoint(self, method, endpoint, data=None, params=None, retries=1):
params = params or {}
url = "{}{}".format(self.base_api_url, endpoint)
try:
response = self.make_request(method, url, params=params, data=data)
return response.json()
except Exception as e:
response = getattr(e, 'response', None)
status_code = response.status_code if response is not None else 500
try:
error_data = response.json().get("error", {})
error_data.update({
'status_code': status_code,
"error_instance": e
})
except (AttributeError, json.JSONDecodeError):
# If response is unavailable or contained no json, build
# our own error dict matching the expected format
error_data = {
"message": {
"lang": "en",
"value": str(e)
},
'status_code': status_code,
"error_instance": e,
}
message = 'request_endpoint ({}): {}'.format(
endpoint, build_error_msg_from_dict(error_data))
log_error(message)
if self.fail_silently:
print(error_data)
else:
raise e
def get_list(self):
# returns a list of all textures (not paginated yet)
params = {
'query': 'list'
}
return self.request_endpoint(
'get', self.endpoints.get('textures'), params=params)
def get_content(self, sku):
# returns list of textures paths for sku
params = {
'query': 'content',
'sku': sku
}
return self.request_endpoint(
'get', self.endpoints.get('textures'), params=params)
def generate_pre_signed_file(self, sku, path):
# returns a file for path - with an expiry date
params = {
'query': 'content',
'sku': sku,
'file': path
}
if self.verbosity > 1:
print(params)
return self.request_endpoint(
'get', self.endpoints.get('textures'), params=params)
def get_product(self, sku):
if not getattr(self, 'product', None):
try:
self.product = Product.objects.get(sap_material_id=sku)
except Exception as e:
message = 'get_textures_paths ({}): {}'.format(
sku, build_error_msg_from_dict({
"error_instance": str(e)
}))
log_error(message)
if self.fail_silently:
if self.verbosity > 1:
print(e)
return []
else:
raise e
return self.product
def get_folder_name(self):
return "textures/"
def get_zip_name(self, sku):
product = self.get_product(sku)
brand_name = "{}_".format(product.brand.name) if product.brand else ''
parent_name = "{}_".format(product.title.replace(' ', '_'))
name = "{}".format(product.subtitle.replace(' ', '_'))
return "{}{}{}.zip".format(brand_name, parent_name, name)
def get_temp_folder_name(self, sku):
return "{}temp/{}/".format(self.get_folder_name(), sku)
def get_temp_file_name(self, path):
return path.split('/')[-1]
def get_textures_paths(self, sku):
paths = []
try:
product = self.get_product(sku)
content_response = self.get_content(product.sap_material_id)
print(content_response)
except Exception as e:
message = 'get_textures_paths ({}): {}'.format(
sku, build_error_msg_from_dict({
"error_instance": str(e)
}))
log_error(message)
if self.fail_silently:
if self.verbosity > 1:
print(e)
return []
else:
raise e
# if we have 'vrscene/' files pick those,
# otherwise look for 'vizoo/textures' files
if any(
path.startswith('%s/%s/' % (product.sap_material_id, 'vrscene'))
for path in content_response.get('content', [])
):
subfolder_lookup = 'vrscene'
else:
subfolder_lookup = 'vizoo/textures'
for path in content_response.get('content', []):
if path.startswith('%s/%s/' % (product.sap_material_id, subfolder_lookup)):
if self.verbosity > 1:
print('%s : Keep path ' % path)
paths.append(path)
else:
if self.verbosity > 1:
print('%s : Skip path ' % path)
return paths
def get_or_create_temp_file(self, sku, pre_signed_url, filename):
start = time.time()
temp_file_path = "%s%s" % (
self.get_temp_folder_name(sku), filename)
if not os_storage.exists(temp_file_path):
# TODO multiple attempts?
if self.verbosity > 1:
print('Begin : {}'.format(pre_signed_url))
response = requests.get(pre_signed_url)
if self.verbosity > 1:
print('Requested : {}'.format(pre_signed_url))
if response.status_code != requests.codes.ok:
return
# don't need buffer here
# with BytesIO() as buffer:
# if self.verbosity > 1:
# print('Buffer : start')
# buffer.write(response.content)
# buffer.seek(0)
if self.verbosity > 1:
print('ContentFile : create')
temp_file = ContentFile(
# buffer.read(),
response.content,
name=self.get_temp_file_name(filename))
if self.verbosity > 1:
print('Default storage : save')
# prevent duplicates (if requested multiple times)
if os_storage.exists(temp_file_path):
os_storage.delete(temp_file_path)
os_storage.save(temp_file_path, temp_file)
end = time.time()
if self.verbosity > 1:
print('get_or_create_temp_file {} {}'.format(filename, end - start))
return {
'filename': filename,
'path': temp_file_path
}
def get_or_create_temp_textures(self, sku):
start = time.time()
pre_signed_files = []
files = []
# 1. generate_pre_signed_file
for path in self.get_textures_paths(sku):
pre_signed_file = self.generate_pre_signed_file(
sku,
path.replace('%s/' % sku, ''))
pre_signed_files.append({
'sku': sku,
'pre_signed_url': pre_signed_file.get('url'),
'filename': self.get_temp_file_name(path),
})
# 2. get_temp_file
for pre_signed_file in pre_signed_files:
temp_file = self.get_or_create_temp_file(**pre_signed_file)
if temp_file:
files.append(temp_file)
if self.verbosity > 1:
print('%s : Added' % pre_signed_file.get('filename'))
end = time.time()
if self.verbosity > 1:
print('get_textures_for_product {}'.format(end - start))
return files
def get_zip_of_textures(self, sku):
file_path = "%s%s" % (self.get_folder_name(), self.get_zip_name(sku))
if s3_storage:
if s3_storage.exists(file_path):
if self.verbosity > 1:
print('Return from S3 : {}'.format(file_path))
return s3_storage.url(file_path)
if self.verbosity > 1:
print('Failed Return from S3 : {}'.format(file_path))
if os_storage.exists(file_path):
if self.verbosity > 1:
print('Return from os : {}'.format(file_path))
return os_storage.url(file_path)
if self.verbosity > 1:
print("Doesn't exist : {}".format(file_path))
def get_or_create_zip_of_textures(self, sku, force=False):
file_path = "%s%s" % (self.get_folder_name(), self.get_zip_name(sku))
start = time.time()
if self.verbosity > 1:
print(file_path)
if s3_storage and s3_storage.exists(file_path) and not force:
if self.verbosity > 1:
print('Zip Exists (on S3 storage) : {}'.format(file_path))
elif os_storage.exists(file_path) and not force:
if self.verbosity > 1:
print('Zip Exists (on os storage) : {}'.format(file_path))
if s3_storage:
s3_storage.save(file_path, os_storage.open(file_path))
if s3_storage.exists(file_path):
if self.verbosity > 1:
print('Saved to S3 : {}'.format(file_path))
else:
raise Exception('Failed to carry {} over to s3'.format(file_path))
else:
# Zip doesn't exist, create
files = self.get_or_create_temp_textures(sku)
spec_sheet = self.get_or_create_spec_sheet(sku)
if not(len(files) > 0):
# This can happen when we have the content endpoint but for
# some reason, there is nothing in the list.
raise Exception('Error, no files')
if self.verbosity > 1:
print('{} : Zip Create'.format(sku))
print(f'{self.get_folder_name()}{sku}')
zip_file_path = "%s%s" % (
self.get_folder_name(), self.get_zip_name(sku))
if self.verbosity > 1:
print(zip_file_path)
with zipfile.ZipFile(
os_storage.path(zip_file_path),
mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) as zf:
for file_data in files:
if self.verbosity > 1:
print('File data')
print(file_data)
with os_storage.open(file_data.get('path'), 'rb') as source_file:
with zf.open(file_data.get('filename'), 'w') as dest_file:
for chunk in iter(lambda: source_file.read(1024 * 8), b''):
dest_file.write(chunk)
if self.verbosity > 1:
print('Spec sheet')
print(spec_sheet)
# note this file is already saved but on S3,
# and we don't want to remove it, so just read it
with default_storage.open(spec_sheet.get('path')) as file:
zf.writestr(spec_sheet.get('filename'), file.read())
if self.verbosity > 1:
print('Path : {}'.format(zip_file_path))
# Zip has been created, delete temporary files
for file_data in files:
if os_storage.exists(file_data.get('path')):
os_storage.delete(file_data.get('path'))
if s3_storage:
if s3_storage.exists(file_path):
s3_storage.delete(file_path)
s3_storage.save(file_path, os_storage.open(zip_file_path))
if s3_storage.exists(file_path):
if self.verbosity > 1:
print('Saved to S3 : {}'.format(file_path))
else:
raise Exception('Failed to carry {} over to s3'.format(file_path))
# Zip has been uploaded to S3, delete local copy
if os_storage.exists(zip_file_path):
os_storage.delete(zip_file_path)
end = time.time()
if self.verbosity > 1:
print('zip_textures_for_product {}'.format(end - start))
end = time.time()
if self.verbosity > 1:
print('get_or_create_zip_of_textures {}'.format(end - start))
def get_or_create_spec_sheet(self, sku):
start = time.time()
product = self.get_product(sku)
pdf = get_pdf(product, all_colors=False)
end = time.time()
if self.verbosity > 1:
print('get_or_create_spec_sheet {} {}'.format(sku, end - start))
return {
'path': pdf,
'filename': '{}.pdf'.format(sku)
}
def sync_textures(self):
start = time.time()
sku_list = self.get_list().get('skus', [])
textures_available = Product.objects.filter(
sap_material_id__in=sku_list).values_list('id', flat=True)
Product.objects.filter(
id__in=textures_available).update(textures_available=True)
# If there are any that are set to True, but
# shouldn't be then fix them up
Product.objects\
.filter(textures_available=True)\
.exclude(id__in=textures_available)\
.update(textures_available=False)
end = time.time()
if self.verbosity > 1:
print('sync_textures {}'.format(end - start))
# def clean_s3_storage(self, force_delete=False):
# NOTE if we do this, we need to add an expiry date as the user might
# still try to access this
# start = time.time()
# _, files = os_storage.listdir(self.get_folder_name())
# for file_ in files:
# # TODO - only delete these if they are older than 'x'
# # if force_delete or \
# # (datetime.datetime.now() - os_storage.get_modified_time(
# # f'{self.get_folder_name()}{file}')).total_seconds() / 60 < 5:
# if self.verbosity > 1:
# print(file_)
# os_storage.delete(f'{self.get_folder_name()}{file_}')
# end = time.time()
# if self.verbosity > 1:
# print('clean_zip_textures {}'.format(end - start))
def clean_os_storage(self, force_delete=False):
start = time.time()
# 1. clean zip
try:
_, files = os_storage.listdir(self.get_folder_name())
for file_ in files:
if force_delete or \
(datetime.datetime.now() - os_storage.get_modified_time(
f'{self.get_folder_name()}{file_}')).total_seconds() / 60 < 5:
if self.verbosity > 1:
print(file_)
os_storage.delete(f'{self.get_folder_name()}{file_}')
except Exception:
if self.verbosity > 1:
print(traceback.format_exc())
# 2. clean temp
try:
dirs, _ = os_storage.listdir(f'{self.get_folder_name()}temp/')
for dir_ in dirs:
if force_delete or \
(datetime.datetime.now() - os_storage.get_modified_time(
f'{self.get_folder_name()}temp/{dir_}')).total_seconds() / 60 < 5:
if self.verbosity > 1:
print(dir_)
print(os_storage.path(f'{self.get_folder_name()}temp/{dir_}'))
shutil.rmtree(
os_storage.path(f'{self.get_folder_name()}temp/{dir_}'),
ignore_errors=False)
except Exception:
if self.verbosity > 1:
print(traceback.format_exc())
end = time.time()
if self.verbosity > 1:
print('clean_temp_textures {}'.format(end - start))
def process_customer_request_for_textures(self, request_texture):
user = request_texture.user
product = request_texture.product
email = request_texture.get_email()
if (request_texture.status in [RequestTexture.STATUS_PENDING, ]):
# start process
request_texture.status = RequestTexture.STATUS_PROCESSING
request_texture.save()
else:
# TODO handle this as it would indicate that the task is
# running twice, or running on a non pending item
return
try:
self.get_or_create_zip_of_textures(product.sap_material_id)
except Exception as e:
request_texture.status = RequestTexture.STATUS_FAILED
request_texture.error = traceback.format_exc()
request_texture.save()
log_error(e)
return
recipients = [email, ]
context = {
'domain': user.usersite.site.domain,
'product': product,
}
body = render_to_string('customers/email/texture_download.txt', context)
body_html = render_to_string('customers/email/texture_download.html', context)
try:
email_message = EmailMultiAlternatives(
f'Download request for { product.title } { product.subtitle } digital textures',
body, settings.DEFAULT_FROM_EMAIL, recipients)
email_message.attach_alternative(body_html, 'text/html')
email_message.send(fail_silently=False)
except Exception as e:
request_texture.status = RequestTexture.STATUS_FAILED
request_texture.error = traceback.format_exc()
request_texture.save()
log_error(e)
return
request_texture.status = RequestTexture.STATUS_COMPLETED
request_texture.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment