Skip to content

Instantly share code, notes, and snippets.

@lyhapple
Created September 13, 2013 00:50
Show Gist options
  • Select an option

  • Save lyhapple/6545716 to your computer and use it in GitHub Desktop.

Select an option

Save lyhapple/6545716 to your computer and use it in GitHub Desktop.
多线程对同步队列进行操作,创建ES索引的例子,使用生产者与消费者的数据模型进行处理。
#coding=utf-8
"""
百科ES搜索引擎初始化模块
"""
from Queue import Queue
import datetime
import time
import threading
import traceback
import pyes
import pymssql
from django.template.defaultfilters import striptags
__author__ = 'lyhapple'
########################### 参数配置 #####################################
#同步队列中的最大条数
MAX_LIST_NUMBER = 10000
#写入ES索引的最大线程数, 这个最大线程数不能大于2,
#大于2之后ES总是会连接拿到不到索引库的连接,怀疑pyes做的不够完善,或有连接泄漏。
MAX_THREADS = 2
#ES server
ES_SERVER_IP_AND_PORT = ("http", '192.168.1.191', '9200')
#百科系统通用索引名称,全系统均采用这一索引名称
BAIKE_ES_INDEX_NAME = u"baike_index"
#词条索引类型名称,全系统均采用此名称
BAIKE_ES_TYPE_NAME = u"baike"
RENWU_ES_TYPE_NAME = u"renwu"
CIDIAN_ES_TYPE_NAME = u"cidian"
RENWU_SQL = """select id, dxid, name, intro, '/renwu/' + convert(varchar(255), dxid) + '.html'
from baike_renwuarticle"""
BAIKE_SQL = """select id, dxid, title, contents, '/baike/' + convert(varchar(255), dxid) + '.html'
from baike_baikearticle """
CIDIAN_SQL = """select id, dxid, original, translate, '/cidian/' + convert(varchar(255), dxid) + '.html'
from baike_dictionary """
MSSQL = {
"ip": "192.168.1.207",
"port": "1433",
"user": "sa",
"password": "delicacy",
"db": "baike"
}
#ES搜索引擎文档结构定义
ES_DOCUMENT_MAPPING = {
"id": {
"type": "string",
"index": "not_analyzed",
"store": "yes"
},
"dxid": {
"type": "string",
"index": "not_analyzed",
"store": "yes"
},
"title": {
"type": "string",
"indexAnalyzer": "smartcn",
"searchAnalyzer": "smartcn",
"store": "yes"
},
"content": {
"type": "string",
"indexAnalyzer": "smartcn",
"searchAnalyzer": "smartcn",
"store": "yes"
},
"url": {
"type": "string",
"index": "not_analyzed",
"store": "yes"
},
}
################################### 参数配置结束 #####################################
def get_mssql_conn():
"""
获取sql server数据库连接
"""
conn = pymssql.connect(server=MSSQL["ip"], port=MSSQL["port"], user=MSSQL["user"],
password=MSSQL["password"], database=MSSQL["db"], charset="UTF-8")
return conn
def get_es_connection(default_index=BAIKE_ES_INDEX_NAME):
"""
获取ES server连接
"""
return pyes.ES([ES_SERVER_IP_AND_PORT], default_indices=default_index, timeout=60)
def convert_chn_time(datetime):
"""当前时间"""
if not datetime:
return ""
return datetime.strftime('%Y-%m-%d %H:%M:%S')
def create_index(conn, index_name=BAIKE_ES_INDEX_NAME):
try:
#创建索引,设置索引副本数及刷新时间,分片数量
conn.create_index(index_name, {"refresh_interval": 0,
"number_of_replicas": 0,
"number_of_shards": 4})
except:
traceback.print_exc()
def put_mapping(conn, mapping, index_names=None, doc_type=None):
"""索引映射"""
if not index_names:
index_names = [BAIKE_ES_INDEX_NAME]
conn.put_mapping(doc_type=doc_type, mapping={'properties': mapping}, indices=index_names)
def add_index(conn, doc, index_name=BAIKE_ES_INDEX_NAME, doc_type=None):
"""添加文章索引"""
if not doc_type:
print "please set doc_type value"
eid = doc["dxid"] if "dxid" in doc else doc["id"]
conn.index(doc, index_name, doc_type, eid)
def optimize(conn, index_names=None):
"""索引优化"""
if not index_names:
index_names = [BAIKE_ES_INDEX_NAME]
conn.optimize(index_names)
def refresh(conn, index_names=None):
"""刷新索引"""
if not index_names:
index_names = [BAIKE_ES_INDEX_NAME]
conn.refresh(index_names)
def get_mssql_cursor(sql):
cursor = get_mssql_conn().cursor()
cursor.execute(sql)
return cursor
def build_doc(data):
doc = {
u"id": data[0],
u"dxid": unicode(data[1]),
u"title": unicode(data[2]),
u"content": unicode(striptags(data[3]).replace("\n", '')),
u"url": unicode(data[4])
}
return doc
class Producer(threading.Thread):
def __init__(self, cursor):
super(Producer, self).__init__()
self.cursor = cursor
def run(self):
count = 0
global QUEUE
while True:
#if QUEUE.qsize() == MAX_LIST_NUMBER: #测试用
# break
try:
data = self.cursor.next()
QUEUE.put(data, 1)
count += 1
if count % MAX_LIST_NUMBER == 0:
print "add to QUEUE ===> %s" % count
except UnicodeDecodeError:
continue
except StopIteration:
break
print "all done, and db total count is %s ******************" % count
print "Producer thread stop.............."
class Customer(threading.Thread):
def __init__(self, doc_type):
super(Customer, self).__init__()
self.doc_type = doc_type
def run(self):
global CUSTOMER_COUNT
global QUEUE
conn = get_es_connection()
print "conn from %s ===> %s " % (self.name, id(conn))
while QUEUE.qsize() > 100:
data = QUEUE.get(1)
doc = build_doc(data)
try:
add_index(conn, doc, doc_type=self.doc_type)
except:
UN_SUCCESS_DOC_QUEUE.put(data)
CUSTOMER_COUNT += 1
if CUSTOMER_COUNT % MAX_LIST_NUMBER == 0:
print "insert into ES ===> %s ===> %s " % (self.name, CUSTOMER_COUNT)
print "%s dead ..... ============ " % self.name
CUSTOMER_COUNT = 0
QUEUE = Queue(MAX_LIST_NUMBER)
UN_SUCCESS_DOC_QUEUE = Queue()
def while_func(queue, es_conn, doc_type):
while not queue.empty():
data = queue.get(1)
doc = build_doc(data)
try:
add_index(es_conn, doc, doc_type=doc_type)
except Exception:
#queue.put(data, 1)
print "while_func exception , add data to UN_SUCCESS_DOC_QUEUE......."
print data
time.sleep(1)
print "while_func called......"
def main():
type_and_sql = [(CIDIAN_ES_TYPE_NAME, CIDIAN_SQL), (RENWU_ES_TYPE_NAME, RENWU_SQL),
(BAIKE_ES_TYPE_NAME, BAIKE_SQL)]
#type_and_sql = [(CIDIAN_ES_TYPE_NAME, CIDIAN_SQL)]
es_conn = get_es_connection()
create_index(es_conn)
for doc_type, sql in type_and_sql:
#计数变量归零
global CUSTOMER_COUNT
CUSTOMER_COUNT = 0
print "%s, start at : %s =========== " % (doc_type, convert_chn_time(datetime.datetime.now()))
put_mapping(es_conn, ES_DOCUMENT_MAPPING, doc_type=doc_type)
cursor = get_mssql_cursor(sql)
p = Producer(cursor)
p.start()
print "%s producer started, ......" % doc_type
time.sleep(5)
threads = []
for x in range(MAX_THREADS):
customer = Customer(doc_type)
threads.append(customer)
nloops = range(len(threads))
for n in nloops:
threads[n].start()
print "all customers started, ......"
for n in nloops:
threads[n].join()
print "threads over CUSTOMER_COUNT is %s ================== " % CUSTOMER_COUNT
#最后一百条数据
global QUEUE
while_func(QUEUE, es_conn, doc_type)
#子线程中执行失败的
global UN_SUCCESS_DOC_QUEUE
while_func(UN_SUCCESS_DOC_QUEUE, es_conn, doc_type)
try:
es_conn = get_es_connection()
optimize(es_conn)
refresh(es_conn)
print "optimize and refresh called......"
except:
pass
print "%s, done at : %s ========== " % (doc_type, convert_chn_time(datetime.datetime.now()))
print ""
print ""
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment