-
-
Save lrhazi/3877133 to your computer and use it in GitHub Desktop.
doc_writer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import datetime | |
import time | |
import thread | |
import os | |
from thrift import Thrift | |
from thrift.transport import TTransport | |
from thrift.transport import TSocket | |
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated | |
from pyes import * | |
from gzip import GzipFile | |
class DocWriter: | |
def __init__(self, hosts): | |
self.key_id = 0 | |
self.port = '9500' | |
self.server_properties = { "base": "192.168.0.", "port": "9500" } | |
self.index = { "name": "twitter" } | |
self.type = { "name": "tweet" } | |
self.hosts = map(self.set_host, hosts) | |
self.property_files = { "base": "/root/F7setup", "index": "index_settings", "mapping": "tweet_mapping" } | |
self.data_path = '/root/F7setup/Cluster/data' | |
self.connect = self.connection() | |
self.set_files() | |
try: | |
self.set_index() | |
except Exception: | |
print "Index: " + self.index['name'] + " already exists" | |
else: | |
print "Index: " + self.index['name'] + " is set" | |
self.set_mapping() | |
def set_host(self, host): | |
return self.server_properties['base'] + host + ":" + self.server_properties['port'] | |
def connection(self): | |
return ES(self.hosts, timeout=12.0, bulk_size=10000, max_retries=10) | |
def delete_index(self): | |
self.connect.delete_index(self.index['name']) | |
def set_index(self): | |
with open(self.property_files['base'] + "/" + | |
self.property_files['index'], 'r') as f: | |
index=json.loads(f.read()) | |
self.index=index=index['index'] | |
self.connect.create_index(index['name'], index['properties']) | |
def set_mapping(self): | |
with open(self.property_files['base']+"/"+self.property_files['mapping'], 'r') as f: | |
self.type=json.loads(f.read(), object_hook=self.fix_mapping) | |
self.type=self.type['type'] | |
self.connect.put_mapping(self.type['name'], self.type, self.index['name']) | |
def set_files(self): | |
self.data_files = self.get_files() | |
def get_files(self): | |
return map(lambda x: self.data_path + "/" + x, os.listdir(self.data_path)) | |
def write_files_to_es(self, file_count_offset=0): | |
last = start = datetime.datetime.now() | |
self.set_files() | |
del self.data_files[0:file_count_offset] | |
for file in self.data_files: | |
self.write_block_to_es(file) | |
print "document count: " + str(self.key_id) | |
print file + " written to es" | |
finish = datetime.datetime.now() | |
print "time span: ", finish - start | |
print "block time span: ", finish - last | |
last = finish | |
print " duration: ", finish - start | |
def write_file_to_es(self, file): | |
with GzipFile(file, 'r') as f: | |
line = f.readline() | |
while 0 < len(line): | |
self.key_id = self.key_id + 1 | |
line = json.loads(line, object_hook=self.fix_obj) | |
response = self.connect.index(line, self.index['name'], self.type['name'], self.key_id) | |
try: response['ok'] | |
except NameError: | |
print "file name: ", file, " count: ", self.key_id | |
print response | |
f.close | |
return self.key_id | |
else: | |
line = f.readline() | |
f.closed | |
return self.key_id | |
def write_block_to_es(self, file): | |
with GzipFile(file, 'r') as f: | |
line = f.readline() | |
while 0 < len(line): | |
self.key_id = self.key_id + 1 | |
line = json.loads(line, object_hook=self.fix_obj) | |
response = self.connect.index(line, self.index['name'], self.type['name'], self.key_id, bulk=True) | |
line = f.readline() | |
f.closed | |
return self.key_id | |
def write_file_line_to_es(self, file, line_no): | |
line_cnt = 1 | |
with GzipFile(file, 'r') as f: | |
line = f.readline() | |
while 0 < len(line): | |
self.key_id = self.key_id + 1 | |
if line_cnt == line_no: | |
print line | |
line = json.loads(line, object_hook=self.fix_obj) | |
response = self.connect.index(line, self.index['name'], self.type['name'], self.key_id) | |
break | |
line_cnt = line_cnt + 1 | |
line = f.readline() | |
f.closed | |
return self.key_id | |
def fix_mapping(self, obj): | |
if 'properties' in obj: | |
for item in obj['properties']: | |
if not 'index' in obj['properties'][item]: | |
obj['properties'][item]['index'] = 'not_analyzed' | |
return obj | |
def fix_obj(self, obj): | |
if 'in_reply_to_user_id' in obj: | |
obj['in_reply_to_user_id'] = str(obj['in_reply_to_user_id']) | |
if 'retweeted_status' in obj: | |
if 'retweet_count' in obj['retweeted_status']: | |
# print(obj['retweeted_status']['retweet_count']) | |
obj['retweeted_status']['retweet_count'] = str(obj['retweeted_status']['retweet_count']) | |
obj['retweeted_status']=obj['retweeted_status'] | |
elif 'retweet_count' in obj: | |
obj['retweet_count'] = str(obj['retweet_count']) | |
return obj |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment