Skip to content

Instantly share code, notes, and snippets.

@lrhazi
Forked from wolfkden/NoServerAvailable Exception
Created October 12, 2012 03:29
Show Gist options
  • Save lrhazi/3877133 to your computer and use it in GitHub Desktop.
Save lrhazi/3877133 to your computer and use it in GitHub Desktop.
doc_writer
import json
import datetime
import time
import thread
import os
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from pyes import *
from gzip import GzipFile
class DocWriter:
def __init__(self, hosts):
self.key_id = 0
self.port = '9500'
self.server_properties = { "base": "192.168.0.", "port": "9500" }
self.index = { "name": "twitter" }
self.type = { "name": "tweet" }
self.hosts = map(self.set_host, hosts)
self.property_files = { "base": "/root/F7setup", "index": "index_settings", "mapping": "tweet_mapping" }
self.data_path = '/root/F7setup/Cluster/data'
self.connect = self.connection()
self.set_files()
try:
self.set_index()
except Exception:
print "Index: " + self.index['name'] + " already exists"
else:
print "Index: " + self.index['name'] + " is set"
self.set_mapping()
def set_host(self, host):
return self.server_properties['base'] + host + ":" + self.server_properties['port']
def connection(self):
return ES(self.hosts, timeout=12.0, bulk_size=10000, max_retries=10)
def delete_index(self):
self.connect.delete_index(self.index['name'])
def set_index(self):
with open(self.property_files['base'] + "/" +
self.property_files['index'], 'r') as f:
index=json.loads(f.read())
self.index=index=index['index']
self.connect.create_index(index['name'], index['properties'])
def set_mapping(self):
with open(self.property_files['base']+"/"+self.property_files['mapping'], 'r') as f:
self.type=json.loads(f.read(), object_hook=self.fix_mapping)
self.type=self.type['type']
self.connect.put_mapping(self.type['name'], self.type, self.index['name'])
def set_files(self):
self.data_files = self.get_files()
def get_files(self):
return map(lambda x: self.data_path + "/" + x, os.listdir(self.data_path))
def write_files_to_es(self, file_count_offset=0):
last = start = datetime.datetime.now()
self.set_files()
del self.data_files[0:file_count_offset]
for file in self.data_files:
self.write_block_to_es(file)
print "document count: " + str(self.key_id)
print file + " written to es"
finish = datetime.datetime.now()
print "time span: ", finish - start
print "block time span: ", finish - last
last = finish
print " duration: ", finish - start
def write_file_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'], self.type['name'], self.key_id)
try: response['ok']
except NameError:
print "file name: ", file, " count: ", self.key_id
print response
f.close
return self.key_id
else:
line = f.readline()
f.closed
return self.key_id
def write_block_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'], self.type['name'], self.key_id, bulk=True)
line = f.readline()
f.closed
return self.key_id
def write_file_line_to_es(self, file, line_no):
line_cnt = 1
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
if line_cnt == line_no:
print line
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'], self.type['name'], self.key_id)
break
line_cnt = line_cnt + 1
line = f.readline()
f.closed
return self.key_id
def fix_mapping(self, obj):
if 'properties' in obj:
for item in obj['properties']:
if not 'index' in obj['properties'][item]:
obj['properties'][item]['index'] = 'not_analyzed'
return obj
def fix_obj(self, obj):
if 'in_reply_to_user_id' in obj:
obj['in_reply_to_user_id'] = str(obj['in_reply_to_user_id'])
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:
# print(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']['retweet_count'] = str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment