Last active
March 18, 2021 19:51
-
-
Save xudifsd/8e3df2a7ce14eb50671d5c60ca5f3e36 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import traceback | |
import sqlite3 | |
import logging | |
from Queue import Queue | |
from Queue import Empty | |
import threading | |
import multiprocessing | |
import functools | |
import signal | |
import faulthandler # since python 3.3 https://docs.python.org/3/library/faulthandler.html | |
import configparser | |
log = logging.getLogger("py_utils") | |
#lib_dir_path = os.path.join(os.path.dirname(__file__), "../py-lib") | |
# | |
#sys.path.append(lib_dir_path) | |
#sys.path.append(os.path.join(lib_dir_path, "protobuf-3.2.0-py2.7.egg")) | |
#sys.path.append(os.path.join(lib_dir_path, "six-1.10.0-py2.py3-none-any.whl")) | |
#sys.path.append(os.path.join(lib_dir_path, "thrift")) | |
from thrift import Thrift | |
from thrift.transport import TSocket | |
from thrift.transport import TTransport | |
from thrift.protocol import TCompactProtocol | |
class Connect: | |
def __init__(self, address, port): | |
self.__address = address | |
self.__port = port | |
def curl(self, httpType, url, param={}): | |
if not (self.__address and self.__port): | |
raise ValueError("should provide address and port") | |
content=urllib.urlencode(param) | |
connection = httplib.HTTPConnection(self.__address, self.__port) | |
if httpType != 'POST' and httpType != 'PUT': | |
paramUrl = url + '?' + content | |
connection.request(httpType, paramUrl) | |
else: | |
headers = {"Content-type": "application/x-www-form-urlencoded;charset=UTF-8"} | |
connection.request(httpType,url,content,headers) | |
response=connection.getresponse() | |
result=response.read().strip() | |
return result | |
def connect(self, Client): | |
if not (self.__address and self.__port): | |
raise ValueError("should provide address and port") | |
self.__transport = TSocket.TSocket(self.__address, self.__port) | |
self.__transport = TTransport.TFramedTransport(self.__transport) | |
self.__transport = TTransport.TBufferedTransport(self.__transport) | |
self.__protocol = TBinaryProtocol.TBinaryProtocol(self.__transport) | |
self.__client = Client(self.__protocol) | |
self.__transport.open() | |
return self.__client | |
def close(self): | |
if self.__transport: | |
self.__transport.close() | |
class Sqlite3Writer: | |
""" Because sqlite3 can not be safely opened in multi-thread, so we use | |
this class to do write, other should only pass write operation through | |
queue """ | |
CREATE_HOST_TABLE = """CREATE TABLE IF NOT EXISTS host ( | |
cluster_name text NOT NULL, | |
ip text NOT NULL, | |
host_name text, | |
tags text)""" | |
CREATE_HOST_INDEX = """CREATE INDEX IF NOT EXISTS host_index ON host (cluster_name, ip);""" | |
CREATE_JOB_TABLE = """CREATE TABLE IF NOT EXISTS job ( | |
cluster_name text NOT NULL, | |
user_name text NOT NULL, | |
service_name text NOT NULL, | |
offset integer NOT NULL, | |
host_ip text)""" | |
CREATE_JOB_INDEX = """CREATE INDEX IF NOT EXISTS job_index ON job ( | |
cluster_name, user_name, service_name, host_ip);""" | |
def __init__(self, queue, db_path="data/matrix_data.db"): | |
self.db_path = db_path | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(Sqlite3Writer.CREATE_HOST_TABLE) | |
cursor.execute(Sqlite3Writer.CREATE_HOST_INDEX) | |
cursor.execute(Sqlite3Writer.CREATE_JOB_TABLE) | |
cursor.execute(Sqlite3Writer.CREATE_JOB_INDEX) | |
conn.commit() | |
conn.close() | |
self.running = True | |
self.queue = queue | |
def process(self): | |
conn = sqlite3.connect(self.db_path) | |
try: | |
cursor = conn.cursor() | |
batch = 0 | |
while self.running: | |
try: | |
data = self.queue.get(True, 1) | |
conn.cursor().execute(*data) | |
batch += 1 | |
if batch > 1000: | |
conn.commit() | |
batch = 0 | |
except Empty as e: | |
# queue empty | |
if batch > 0: | |
conn.commit() | |
batch = 0 | |
continue | |
except Exception as e: | |
traceback.print_exc() | |
log.warn("data is %s, error is %s" % (str(data), str(e))) | |
finally: | |
conn.commit() | |
conn.close() | |
def close(self): | |
self.running = False | |
def thread_pool_map(target, args, thread_pool_size=5): | |
""" block until all task finished, args SHOULD be list of tuple """ | |
def wrapper(queue, *args, **kwargs): | |
result = None | |
try: | |
result = target(*args, **kwargs) | |
except Exception as e: | |
log.info("apply target failed with args %s, kwargs %s", str(args), str(kwargs)) | |
log.exception(e) | |
finally: | |
queue.put(result) | |
thread_pool = [] | |
result = [] | |
for arg in args: | |
if len(thread_pool) > thread_pool_size: | |
joined = False | |
while not joined: | |
for i in xrange(len(thread_pool)): | |
# TODO we can reuse finished thread | |
t = thread_pool[i] | |
t.join(0.2) | |
if not t.isAlive(): | |
joined = True | |
thread_pool.pop(i) | |
break | |
queue = Queue() | |
result.append(queue) | |
processor = functools.partial(wrapper, queue) | |
if type(arg) != tuple: | |
log.warn("arg %s is not of type tuple, ignore it", str(arg)) | |
continue | |
thread_pool.append(threading.Thread(target=processor, args=arg, | |
name="thread-worker-" + str(len(result)))) | |
thread_pool[-1].start() | |
log.info("finished adding all tasks") | |
for t in thread_pool: | |
t.join() | |
log.info("all tasks finished") | |
def getter(queue): | |
try: | |
return queue.get(False) | |
except Empty: | |
return None | |
return map(getter, result) | |
def process_pool_map(target, args, process_pool_size=5): | |
""" block until all task finished, args SHOULD be list of tuple """ | |
def wrapper(queue, *args, **kwargs): | |
""" wrapper fn to log any exception """ | |
result = None | |
try: | |
result = target(*args, **kwargs) | |
except Exception as e: | |
log.info("apply target failed with args %s, kwargs %s", str(args), str(kwargs)) | |
log.exception(e) | |
finally: | |
queue.put(result) | |
process_pool = [] | |
result = [] | |
for arg in args: | |
if len(process_pool) > process_pool_size: | |
joined = False | |
while not joined: | |
for i in xrange(len(process_pool)): | |
# TODO we can reuse finished thread | |
t = process_pool[i] | |
t.join(0.2) | |
if not t.is_alive(): | |
joined = True | |
process_pool.pop(i) | |
break | |
queue = multiprocessing.Queue() | |
result.append(queue) | |
processor = functools.partial(wrapper, queue) | |
if type(arg) != tuple: | |
log.warn("arg %s is not of type tuple, ignore it", str(arg)) | |
continue | |
process_pool.append(multiprocessing.Process(target=processor, args=arg, | |
name="processor-worker-" + str(len(result)))) | |
process_pool[-1].start() | |
log.info("finished adding all tasks to " + str(target)) | |
for t in process_pool: | |
t.join() | |
log.info("all tasks finished for tasks " + str(target)) | |
def getter(queue): | |
""" wrapper getter return None on Empty queue """ | |
try: | |
return queue.get(False) | |
except Empty: | |
return None | |
return map(getter, result) | |
def register_stack_trace_dump(): | |
faulthandler.register(signal.SIGTRAP, all_threads=True, chain=False) | |
def load_and_write_ini(file): | |
config = configparser.ConfigParser() | |
config.optionxform=str # make it case sensitive | |
config.read(file) | |
print(config["foo"]) | |
config["foo"] = "bar" | |
with open(file, "w") as configfile: | |
config.write(configfile) | |
if __name__ == '__main__': | |
register_stack_trace_dump() | |
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) | |
print(thread_pool_map(lambda x : x + 1, map(lambda x : (x,), range(100)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment