Last active
September 18, 2015 22:45
-
-
Save wking/cf87063bd5f76be73db0 to your computer and use it in GitHub Desktop.
Upload product ↔ category associations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# https://gist.github.com/wking/cf87063bd5f76be73db0 | |
import csv | |
import json | |
import logging | |
import sys | |
import time | |
import urllib.request | |
logging.basicConfig(level=logging.INFO) | |
def read_categories(stream=sys.stdin): | |
categories = {'children': {}} | |
logging.info('loading categories') | |
for row in csv.DictReader(stream): | |
parent = categories | |
for field in ['Department', 'Aisle', 'Section', 'Shelf', 'Space']: | |
category = { | |
'name': row[field], | |
'short-name': row['{} URL'.format(field)], | |
'children': {}, | |
} | |
if (category['name'] in ['', '*'] or | |
category['short-name'] in ['', '*']): | |
break | |
if category['name'] not in parent['children']: | |
parent['children'][category['name']] = category | |
parent = parent['children'][category['name']] | |
return categories | |
def upload_categories(base_url, headers, categories, parent_id=None): | |
logging.info('uploading categories') | |
if categories['children']: | |
current_categories = download_categories( | |
base_url=base_url, headers=headers, parent_id=parent_id) | |
for name, category in categories['children'].items(): | |
category['parent'] = parent_id | |
if category['name'] not in current_categories: | |
category_id = upload_category( | |
base_url=base_url, headers=headers, category=category) | |
else: | |
category_id = current_categories[category['name']] | |
upload_categories( | |
base_url=base_url, headers=headers, categories=category, | |
parent_id=category_id) | |
def download_categories(base_url, headers, parent_id=None): | |
logging.info('download categories with parent {}'.format(parent_id)) | |
if parent_id is None: | |
pid = 'null' | |
else: | |
pid = parent_id | |
request = urllib.request.Request( | |
url='{base}/categories?parent={pid}&limit=250'.format( | |
base=base_url, pid=pid), | |
headers=headers, | |
method='GET', | |
) | |
with urllib.request.urlopen(request) as response: | |
categories_bytes = response.read() | |
charset = response.headers.get_content_charset() | |
categories_json = categories_bytes.decode(charset) | |
categories = json.loads(categories_json) | |
name_ids = {cat['name']: cat['id'] for cat in categories} | |
logging.info('downloaded categories with parent {}: {}'.format( | |
parent_id, name_ids)) | |
time.sleep(1) | |
return name_ids | |
def upload_category(base_url, headers, category): | |
data = category.copy() | |
data.pop('children') | |
logging.info('upload category {}'.format(data)) | |
request = urllib.request.Request( | |
url='{base}/categories'.format(base=base_url), | |
data=json.dumps(data).encode('UTF-8'), | |
headers=headers, | |
method='POST', | |
) | |
with urllib.request.urlopen(request) as response: | |
new_category_bytes = response.read() | |
charset = response.headers.get_content_charset() | |
new_category_json = new_category_bytes.decode(charset) | |
new_category = json.loads(new_category_json) | |
logging.info('uploaded category {} with id {}'.format( | |
new_category['name'], new_category['id'])) | |
time.sleep(1) | |
return new_category['id'] | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('categories') | |
parser.add_argument('--base-url', default='https://api.azurestandard.com') | |
parser.add_argument( | |
'-H', '--header', action='append', default=[], dest='headers') | |
args = parser.parse_args() | |
headers = {} | |
for header in args.headers: | |
key, value = [x.strip() for x in header.split(':', 1)] | |
headers[key] = value | |
with open(args.categories, 'r') as stream: | |
categories = read_categories(stream=stream) | |
upload_categories( | |
base_url=args.base_url, headers=headers, categories=categories) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# https://gist.github.com/wking/cf87063bd5f76be73db0 | |
import csv | |
import json | |
import logging | |
import sys | |
import time | |
import urllib.error | |
import urllib.parse | |
import urllib.request | |
logging.basicConfig(level=logging.INFO) | |
def get_categories(base_url, cache='/tmp/categories.json'): | |
try: | |
with open(cache, 'r') as stream: | |
categories_json = stream.read() | |
except FileNotFoundError: | |
logging.info('requesting categories') | |
categories = [] | |
count = None | |
start = 0 | |
limit = 250 | |
while True: | |
logging.info('requesting categories ({} from {} of {})'.format( | |
limit, start, count)) | |
with urllib.request.urlopen( | |
'{}/categories?{}'.format( | |
base_url, | |
urllib.parse.urlencode({ | |
'start': start, | |
'limit': limit, | |
})) | |
) as response: | |
categories_bytes = response.read() | |
charset = response.headers.get_content_charset() | |
if count is None: | |
count = int(response.headers['Count']) | |
new_json = categories_bytes.decode(charset) | |
categories.extend(json.loads(new_json)) | |
if len(categories) >= count: | |
break | |
start += limit | |
categories_json = json.dumps(categories) | |
with open(cache, 'w') as stream: | |
stream.write(categories_json) | |
categories = {} | |
cats = json.loads(categories_json) | |
for category in cats: | |
key = (category['name'], category.get('parent')) | |
categories[key] = category['id'] | |
logging.debug('category {} -> {}'.format(key, category['id'])) | |
return categories | |
def get_subheaders(base_url, headers, categories, stream=sys.stdin): | |
subheaders = {} | |
logging.info('loading subheaders') | |
for row in csv.DictReader(stream): | |
subheader = row['SubHeaders'].strip().lower() | |
if not subheader: | |
continue | |
parent_id = None | |
for field in ['Department', 'Aisle', 'Section', 'Shelf', 'Space']: | |
name = row[field] | |
if name in ['', '*']: | |
break | |
short_name = row['{} URL'.format(field)] | |
key = (name, parent_id) | |
try: | |
id = categories[key] | |
except KeyError as error: | |
logging.info('missing category {} ({})'.format(key, row)) | |
try: | |
id = upload_category( | |
base_url=base_url, headers=headers, category={ | |
'name': name, | |
'short-name': name, | |
'parent': parent_id, | |
}) | |
except urllib.error.HTTPError: | |
with urllib.request.urlopen( | |
'{}/categories?{}'.format( | |
base_url, | |
urllib.parse.urlencode({'parent': parent_id})) | |
) as response: | |
categories_bytes = response.read() | |
charset = response.headers.get_content_charset() | |
categories_json = categories_bytes.decode(charset) | |
cats = json.loads(categories_json) | |
category = [cat for cat in cats if cat['name'] == name][0] | |
id = category['id'] | |
categories[key] = id | |
parent_id = id | |
if subheader not in subheaders: | |
subheaders[subheader] = [] | |
subheaders[subheader].append(id) | |
logging.debug('subheaders {} -> {}'.format( | |
subheader, subheaders[subheader])) | |
return subheaders | |
def upload_category(base_url, headers, category): | |
logging.info('upload category {}'.format(category)) | |
request = urllib.request.Request( | |
url='{base}/categories'.format(base=base_url), | |
data=json.dumps(category).encode('UTF-8'), | |
headers=headers, | |
method='POST', | |
) | |
with urllib.request.urlopen(request) as response: | |
new_category_bytes = response.read() | |
charset = response.headers.get_content_charset() | |
new_category_json = new_category_bytes.decode(charset) | |
new_category = json.loads(new_category_json) | |
logging.info('uploaded category {} with id {}'.format( | |
new_category['name'], new_category['id'])) | |
time.sleep(1) | |
return new_category['id'] | |
def associate_products(base_url, headers, subheaders, stream=sys.stdin): | |
logging.info('associating products') | |
missing = set() | |
for row in csv.DictReader(stream): | |
subheader = row['SubHeaders'].strip().lower() | |
if subheader in [ | |
'soon to be discontinued', | |
]: | |
continue | |
code = row['Item code'].strip() | |
try: | |
category_ids = subheaders[subheader] | |
except KeyError as error: | |
if subheader not in missing: | |
logging.error(str(error)) | |
missing.add(subheader) | |
continue | |
for category_id in category_ids: | |
associate_product( | |
base_url=base_url, headers=headers, | |
code=code, category_id=category_id) | |
def associate_product(base_url, headers, code, category_id): | |
logging.info('associate {} with {}'.format(code, category_id)) | |
request = urllib.request.Request( | |
url='{base}/packaged-product/{code}/category/{id}'.format( | |
base=base_url, code=code, id=category_id), | |
headers=headers, | |
method='POST', | |
) | |
with urllib.request.urlopen(request) as response: | |
logging.info('associated {} with {}'.format(code, category_id)) | |
time.sleep(1) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('categories') | |
parser.add_argument('products') | |
parser.add_argument('--base-url', default='https://api.azurestandard.com') | |
parser.add_argument( | |
'-H', '--header', action='append', default=[], dest='headers') | |
args = parser.parse_args() | |
headers = {} | |
for header in args.headers: | |
key, value = [x.strip() for x in header.split(':', 1)] | |
headers[key] = value | |
categories = get_categories(base_url=args.base_url) | |
with open(args.categories, 'r') as stream: | |
subheaders = get_subheaders( | |
base_url=args.base_url, headers=headers, | |
categories=categories, stream=stream) | |
with open(args.products, 'r') as stream: | |
associate_products( | |
base_url=args.base_url, headers=headers, | |
subheaders=subheaders, stream=stream) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For the live upload, I dropped
fix_subheaders
. I converted the Excel files to CSV and ran:The existing-categories download in
product-category-upload.py
doesn't work with the new limited queries from azurestandard/beehive@2728652 (Merge branch 'api-limit', 2015-02-20), but I worked around that by seeding/tmp/categories.json
with data dumped from a Django shell. Now that we supportstart
(azurestandard/api-spec@86d4d29, public.json: Add 'start=...' for offsetting list results, 2015-02-24), you could go that way too.