Created
September 13, 2013 00:50
-
-
Save lyhapple/6545716 to your computer and use it in GitHub Desktop.
多线程对同步队列进行操作,创建ES索引的例子,使用生产者与消费者的数据模型进行处理。
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #coding=utf-8 | |
| """ | |
| 百科ES搜索引擎初始化模块 | |
| """ | |
| from Queue import Queue | |
| import datetime | |
| import time | |
| import threading | |
| import traceback | |
| import pyes | |
| import pymssql | |
| from django.template.defaultfilters import striptags | |
| __author__ = 'lyhapple' | |
| ########################### 参数配置 ##################################### | |
| #同步队列中的最大条数 | |
| MAX_LIST_NUMBER = 10000 | |
| #写入ES索引的最大线程数, 这个最大线程数不能大于2, | |
| #大于2之后ES总是会连接拿到不到索引库的连接,怀疑pyes做的不够完善,或有连接泄漏。 | |
| MAX_THREADS = 2 | |
| #ES server | |
| ES_SERVER_IP_AND_PORT = ("http", '192.168.1.191', '9200') | |
| #百科系统通用索引名称,全系统均采用这一索引名称 | |
| BAIKE_ES_INDEX_NAME = u"baike_index" | |
| #词条索引类型名称,全系统均采用此名称 | |
| BAIKE_ES_TYPE_NAME = u"baike" | |
| RENWU_ES_TYPE_NAME = u"renwu" | |
| CIDIAN_ES_TYPE_NAME = u"cidian" | |
| RENWU_SQL = """select id, dxid, name, intro, '/renwu/' + convert(varchar(255), dxid) + '.html' | |
| from baike_renwuarticle""" | |
| BAIKE_SQL = """select id, dxid, title, contents, '/baike/' + convert(varchar(255), dxid) + '.html' | |
| from baike_baikearticle """ | |
| CIDIAN_SQL = """select id, dxid, original, translate, '/cidian/' + convert(varchar(255), dxid) + '.html' | |
| from baike_dictionary """ | |
| MSSQL = { | |
| "ip": "192.168.1.207", | |
| "port": "1433", | |
| "user": "sa", | |
| "password": "delicacy", | |
| "db": "baike" | |
| } | |
| #ES搜索引擎文档结构定义 | |
| ES_DOCUMENT_MAPPING = { | |
| "id": { | |
| "type": "string", | |
| "index": "not_analyzed", | |
| "store": "yes" | |
| }, | |
| "dxid": { | |
| "type": "string", | |
| "index": "not_analyzed", | |
| "store": "yes" | |
| }, | |
| "title": { | |
| "type": "string", | |
| "indexAnalyzer": "smartcn", | |
| "searchAnalyzer": "smartcn", | |
| "store": "yes" | |
| }, | |
| "content": { | |
| "type": "string", | |
| "indexAnalyzer": "smartcn", | |
| "searchAnalyzer": "smartcn", | |
| "store": "yes" | |
| }, | |
| "url": { | |
| "type": "string", | |
| "index": "not_analyzed", | |
| "store": "yes" | |
| }, | |
| } | |
| ################################### 参数配置结束 ##################################### | |
| def get_mssql_conn(): | |
| """ | |
| 获取sql server数据库连接 | |
| """ | |
| conn = pymssql.connect(server=MSSQL["ip"], port=MSSQL["port"], user=MSSQL["user"], | |
| password=MSSQL["password"], database=MSSQL["db"], charset="UTF-8") | |
| return conn | |
| def get_es_connection(default_index=BAIKE_ES_INDEX_NAME): | |
| """ | |
| 获取ES server连接 | |
| """ | |
| return pyes.ES([ES_SERVER_IP_AND_PORT], default_indices=default_index, timeout=60) | |
| def convert_chn_time(datetime): | |
| """当前时间""" | |
| if not datetime: | |
| return "" | |
| return datetime.strftime('%Y-%m-%d %H:%M:%S') | |
| def create_index(conn, index_name=BAIKE_ES_INDEX_NAME): | |
| try: | |
| #创建索引,设置索引副本数及刷新时间,分片数量 | |
| conn.create_index(index_name, {"refresh_interval": 0, | |
| "number_of_replicas": 0, | |
| "number_of_shards": 4}) | |
| except: | |
| traceback.print_exc() | |
| def put_mapping(conn, mapping, index_names=None, doc_type=None): | |
| """索引映射""" | |
| if not index_names: | |
| index_names = [BAIKE_ES_INDEX_NAME] | |
| conn.put_mapping(doc_type=doc_type, mapping={'properties': mapping}, indices=index_names) | |
| def add_index(conn, doc, index_name=BAIKE_ES_INDEX_NAME, doc_type=None): | |
| """添加文章索引""" | |
| if not doc_type: | |
| print "please set doc_type value" | |
| eid = doc["dxid"] if "dxid" in doc else doc["id"] | |
| conn.index(doc, index_name, doc_type, eid) | |
| def optimize(conn, index_names=None): | |
| """索引优化""" | |
| if not index_names: | |
| index_names = [BAIKE_ES_INDEX_NAME] | |
| conn.optimize(index_names) | |
| def refresh(conn, index_names=None): | |
| """刷新索引""" | |
| if not index_names: | |
| index_names = [BAIKE_ES_INDEX_NAME] | |
| conn.refresh(index_names) | |
| def get_mssql_cursor(sql): | |
| cursor = get_mssql_conn().cursor() | |
| cursor.execute(sql) | |
| return cursor | |
| def build_doc(data): | |
| doc = { | |
| u"id": data[0], | |
| u"dxid": unicode(data[1]), | |
| u"title": unicode(data[2]), | |
| u"content": unicode(striptags(data[3]).replace("\n", '')), | |
| u"url": unicode(data[4]) | |
| } | |
| return doc | |
| class Producer(threading.Thread): | |
| def __init__(self, cursor): | |
| super(Producer, self).__init__() | |
| self.cursor = cursor | |
| def run(self): | |
| count = 0 | |
| global QUEUE | |
| while True: | |
| #if QUEUE.qsize() == MAX_LIST_NUMBER: #测试用 | |
| # break | |
| try: | |
| data = self.cursor.next() | |
| QUEUE.put(data, 1) | |
| count += 1 | |
| if count % MAX_LIST_NUMBER == 0: | |
| print "add to QUEUE ===> %s" % count | |
| except UnicodeDecodeError: | |
| continue | |
| except StopIteration: | |
| break | |
| print "all done, and db total count is %s ******************" % count | |
| print "Producer thread stop.............." | |
| class Customer(threading.Thread): | |
| def __init__(self, doc_type): | |
| super(Customer, self).__init__() | |
| self.doc_type = doc_type | |
| def run(self): | |
| global CUSTOMER_COUNT | |
| global QUEUE | |
| conn = get_es_connection() | |
| print "conn from %s ===> %s " % (self.name, id(conn)) | |
| while QUEUE.qsize() > 100: | |
| data = QUEUE.get(1) | |
| doc = build_doc(data) | |
| try: | |
| add_index(conn, doc, doc_type=self.doc_type) | |
| except: | |
| UN_SUCCESS_DOC_QUEUE.put(data) | |
| CUSTOMER_COUNT += 1 | |
| if CUSTOMER_COUNT % MAX_LIST_NUMBER == 0: | |
| print "insert into ES ===> %s ===> %s " % (self.name, CUSTOMER_COUNT) | |
| print "%s dead ..... ============ " % self.name | |
| CUSTOMER_COUNT = 0 | |
| QUEUE = Queue(MAX_LIST_NUMBER) | |
| UN_SUCCESS_DOC_QUEUE = Queue() | |
| def while_func(queue, es_conn, doc_type): | |
| while not queue.empty(): | |
| data = queue.get(1) | |
| doc = build_doc(data) | |
| try: | |
| add_index(es_conn, doc, doc_type=doc_type) | |
| except Exception: | |
| #queue.put(data, 1) | |
| print "while_func exception , add data to UN_SUCCESS_DOC_QUEUE......." | |
| print data | |
| time.sleep(1) | |
| print "while_func called......" | |
| def main(): | |
| type_and_sql = [(CIDIAN_ES_TYPE_NAME, CIDIAN_SQL), (RENWU_ES_TYPE_NAME, RENWU_SQL), | |
| (BAIKE_ES_TYPE_NAME, BAIKE_SQL)] | |
| #type_and_sql = [(CIDIAN_ES_TYPE_NAME, CIDIAN_SQL)] | |
| es_conn = get_es_connection() | |
| create_index(es_conn) | |
| for doc_type, sql in type_and_sql: | |
| #计数变量归零 | |
| global CUSTOMER_COUNT | |
| CUSTOMER_COUNT = 0 | |
| print "%s, start at : %s =========== " % (doc_type, convert_chn_time(datetime.datetime.now())) | |
| put_mapping(es_conn, ES_DOCUMENT_MAPPING, doc_type=doc_type) | |
| cursor = get_mssql_cursor(sql) | |
| p = Producer(cursor) | |
| p.start() | |
| print "%s producer started, ......" % doc_type | |
| time.sleep(5) | |
| threads = [] | |
| for x in range(MAX_THREADS): | |
| customer = Customer(doc_type) | |
| threads.append(customer) | |
| nloops = range(len(threads)) | |
| for n in nloops: | |
| threads[n].start() | |
| print "all customers started, ......" | |
| for n in nloops: | |
| threads[n].join() | |
| print "threads over CUSTOMER_COUNT is %s ================== " % CUSTOMER_COUNT | |
| #最后一百条数据 | |
| global QUEUE | |
| while_func(QUEUE, es_conn, doc_type) | |
| #子线程中执行失败的 | |
| global UN_SUCCESS_DOC_QUEUE | |
| while_func(UN_SUCCESS_DOC_QUEUE, es_conn, doc_type) | |
| try: | |
| es_conn = get_es_connection() | |
| optimize(es_conn) | |
| refresh(es_conn) | |
| print "optimize and refresh called......" | |
| except: | |
| pass | |
| print "%s, done at : %s ========== " % (doc_type, convert_chn_time(datetime.datetime.now())) | |
| print "" | |
| print "" | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment