Created
January 16, 2016 19:45
-
-
Save rubdos/4f7d0f3658eb15711da1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from boodschap.database import Product, productdb | |
from datetime import timedelta, datetime | |
from multiprocessing.pool import ThreadPool | |
import time | |
import urllib | |
import json | |
import threading | |
from queue import Queue | |
from couchdb import Server, ResourceNotFound | |
page_size = 10 | |
until = 1452657600 # Arbitrary date in januari 2016. Change to whatever offset you need. | |
url = "http://world.openfoodfacts.org/cgi/search.pl?search_terms=&sort_by=last_modified_t&page_size=%s&json=1&page=%s" | |
threadPool = ThreadPool(10) # Threadpool for json decode workers | |
importerThreadCount = 10 # Amount of workers for the CouchDB import | |
simultaneousDownloads = 1 | |
# Write the update time | |
with open("last_update.txt", "r+") as f: | |
contents = f.read() | |
if len(contents) > 0: | |
until = int(contents) - 3600/4 # Substract 15 minutes. You never know you missed something | |
print("Previous update: " + contents) | |
f.seek(0) | |
f.write(str(int(time.time()))) | |
f.close() | |
q = Queue() | |
def parse_product(result): | |
if 'product_name' in result and len(result['product_name']) > 0: | |
now = datetime.utcnow().isoformat("T") + "Z" | |
p = { | |
'name': result['product_name'], | |
'barcode': result['code'], | |
'meta': { | |
'source': url, | |
'license': "ODbL + ODcL", | |
'retrieval_date': now, | |
'update_date': now, | |
'original': result, | |
} | |
} | |
q.put(p) | |
print(result['product_name'] + "\t\t\tadded to Queue") | |
return int(result['last_modified_t']) | |
def execute_transfers(thread_id): | |
print(thread_id) | |
last_date = int(time.time()) | |
page = thread_id + 1 | |
while last_date > until: | |
our_url = url % (page_size, page) | |
req = urllib.request.urlopen(our_url) | |
p = json.loads(req.read().decode()) | |
result = threadPool.map(parse_product, p['products']) | |
new_last_date = min(result) | |
if not last_date > new_last_date: | |
print("Warning, infinite loop") | |
else: | |
last_date = new_last_date | |
page += simultaneousDownloads | |
def importer(): | |
server = Server() | |
productdb = server['products'] | |
while True: | |
item = q.get() | |
print(item['name'] + "\t\t\t\t\t popped from Queue") | |
barcode = item['barcode'] | |
existingRecords = productdb.view('products/all')[barcode] | |
if len(existingRecords.rows) > 0: | |
doc = productdb[existingRecords.rows[0].id] | |
doc.update(item) | |
productdb[doc.id] = doc | |
else: | |
productdb.save(item) | |
q.task_done() | |
for i in range(importerThreadCount): | |
t = threading.Thread(target=importer) | |
t.daemon=True | |
t.start() | |
threadPool.map(execute_transfers, range(0, simultaneousDownloads)) | |
q.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment