Created
August 14, 2010 20:18
-
-
Save donovanhide/524679 to your computer and use it in GitHub Desktop.
Thanks Mikio!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import atexit | |
import csv | |
import os | |
import glob | |
import time | |
import heapq | |
from struct import Struct | |
from array import array | |
from kyotocabinet import * | |
import superfastmatch | |
import itertools | |
#pip install git+http://github.com/donovanhide/SuperFastMatch.git | |
def open_db(file,flags=DB.OWRITER|DB.OCREATE): | |
db = DB() | |
db.open(file,flags) | |
print "Opened %s" % db | |
@atexit.register | |
def cleanup(): | |
db.close() | |
print "Closed %s" % file | |
return db | |
#@profile | |
def key_value(file): | |
db = open_db(file,DB.OREADER) | |
for key in db: | |
yield (int(key), db.get(key),db) | |
#@profile | |
def key_value_2(file): | |
db = open_db(file,DB.OREADER) | |
cursor = db.cursor() | |
cursor.jump() | |
next_key_value = cursor.get | |
try: | |
while True: | |
key,value = next_key_value(True) | |
yield (int(key), value ,db) | |
except TypeError: | |
return | |
#@profile | |
def merge(input_directory,output_file,hash_width,group_size = 20): | |
num_keys = 1<<hash_width | |
index = open_db('%s#rcomp=dec#bnum=%s#psiz=65536#pccap=512m#msiz=512m' % (output_file,num_keys/10)) | |
files = glob.glob(os.path.join(input_directory,'*.kct')) | |
try: | |
files.remove(output_file) | |
except ValueError: | |
pass | |
for step in range(0,len(files),group_size): | |
trees = [key_value_2('%s#pccap=4m'%file) for file in files[step:step+group_size]] | |
start = time.time() | |
last_time = start | |
counter = 0 | |
for key,value,db in heapq.merge(*trees): | |
counter +=1 | |
if (counter%100000==0): | |
now = time.time() | |
print "%s %s appends per/sec at key %s " % (100000/(now-last_time),counter/(now-start),key) | |
# print db.status() | |
last_time = now | |
index.append(key,value) | |
#@profile | |
def fill(input_file,output_directory,window_size,hash_width,memory_limit): | |
num_keys = 1<<hash_width | |
packer = Struct('I') | |
csvfile = open(input_file,'r') | |
csv.field_size_limit(1000000000) | |
reader = csv.DictReader(csvfile) | |
keys = [False]*(num_keys) | |
cache = open_db('cache#type=*#bnum=%s' % num_keys) | |
for record in reader: | |
id = packer.pack(int(record['id'])) | |
def cache_append(key): | |
keys[key]=True | |
cache.append(key,id) | |
hashes = set(superfastmatch.hashes(record['content'],window_size,hash_width,)) | |
map(cache_append, hashes) | |
if cache.size()>memory_limit: | |
tree = open_db(os.path.join(output_directory,'%s.kct#rcomp=dec#bnum=%s#psiz=32768' % (time.time(),num_keys/10))) | |
start = time.time() | |
print "Writing %s.kct with %s keys at %s" % (time.time(),cache.count(),time.ctime()) | |
def tree_copy(key): | |
tree.set(key,cache.get(key)) | |
full_keys = (i for i in xrange(0,num_keys) if keys[i]) | |
map(tree_copy, full_keys) | |
cache.clear() | |
keys = [False]*(num_keys) | |
print "Written in %s seconds" % (time.time()-start) | |
tree.close() | |
if __name__=='__main__': | |
#fill('/media/Fast/news_articles.csv','/media/Fast/',15,24,1500000000) | |
merge('/media/Fast/','/media/Fast/index.kct',24) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Initial few 100,000 appends yield this:
but after about 1.5GB of merging I get this: