Last active
January 19, 2016 05:22
-
-
Save eungju/9480389 to your computer and use it in GitHub Desktop.
Thrift HA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ThriftConnection(object): | |
def __init__(self, service, host, port): | |
socket = TSocket.TSocket(host, port) | |
self.transport = TTransport.TFramedTransport(socket) | |
protocol = TBinaryProtocolAccelerated(self.transport) | |
self.client = service.Client(protocol) | |
self.transport.open() | |
self.open_time = time.time() | |
self.access_time = self.open_time | |
self.str = "%s#%d(%s:%d/%s)" % (self.__class__.__name__, id(self), host, port, service.__name__.rsplit(".", 1)[-1]) | |
def close(self): | |
try: | |
self.transport.close() | |
except Exception: | |
pass | |
def touch(self): | |
self.access_time = time.time() | |
def __str__(self): | |
return self.str | |
class ThriftServer(object): | |
def __init__(self, service, host, port): | |
self.service = service | |
self.host = host | |
self.port = port | |
def acquire(self): | |
return ThriftConnection(self.service, self.host, self.port) | |
def release(self, conn): | |
conn.close() | |
def abandon(self, conn): | |
conn.close() | |
@contextmanager | |
def client(self): | |
conn = self.acquire() | |
try: | |
yield conn.client | |
finally: | |
self.release(conn) | |
def __str__(self): | |
return "%s#%d(%s:%d/%s)" % (self.__class__.__name__, id(self), self.host, self.port, self.service.__name__.rsplit(".", 1)[-1]) | |
class ThriftPool(object): | |
def __init__(self, service, host, port, pool_conf=None): | |
self.pid = os.getpid() | |
self.service = service | |
self.host = host | |
self.port = port | |
self.pool_conf = {"max": 0, "min": 1, "timeout": 0.1, "idle_timeout": 30, "max_lifetime": 30 * 60} | |
if pool_conf: | |
self.pool_conf.update(pool_conf) | |
self.pool_timeout = self.pool_conf["timeout"] | |
self.pool_idle_timeout = self.pool_conf["idle_timeout"] | |
self.pool_max_lifetime = self.pool_conf["max_lifetime"] | |
self.pool = LifoQueue(self.pool_conf["max"]) | |
self.diet() | |
def dispose(self): | |
self.clear() | |
def clear(self): | |
while True: | |
try: | |
conn = self.pool.get(block=False) | |
if conn: | |
self.abandon(conn) | |
except Empty: | |
break | |
def diet(self): | |
for i in range(self.pool_conf["min"]): | |
try: | |
self.pool.put(None, block=False) | |
except Full: | |
break | |
def acquire(self): | |
conn = None | |
try: | |
conn = self.pool.get(block=True, timeout=self.pool_timeout) | |
except Empty: | |
logger.warning("No idle connection, create one more.") | |
if conn: | |
now = time.time() | |
if (now - conn.access_time > self.pool_idle_timeout) or (now - conn.open_time > self.pool_max_lifetime): | |
logger.info("Discard obsolete connection %s." % conn) | |
conn.close() | |
conn = None | |
return conn or ThriftConnection(self.service, self.host, self.port) | |
def release(self, conn): | |
try: | |
conn.touch() | |
self.pool.put(conn, block=False) | |
except Full: | |
logger.warning("The pool is full, discard connection %s." % conn) | |
conn.close() | |
def abandon(self, conn): | |
conn.close() | |
self.clear() | |
self.diet() | |
@contextmanager | |
def client(self): | |
if os.getpid() != self.pid: | |
logger.info("This pool is created by other process, reinitializing it.") | |
self.dispose() | |
self.__init__(self.service, self.host, self.port, pool_conf=self.pool_conf) | |
conn = self.acquire() | |
try: | |
yield conn.client | |
self.release(conn) | |
except Thrift.TException: | |
self.abandon(conn) | |
raise | |
except: | |
self.release(conn) | |
raise | |
def __str__(self): | |
return "%s#%d(%s:%d/%s)" % (self.__class__.__name__, id(self), self.host, self.port, self.service.__name__.rsplit(".", 1)[-1]) | |
class ThriftHA(object): | |
def __init__(self, replicas): | |
self.replicas = replicas[:] | |
self.retry = len(self.replicas) + 1 | |
self.unavailable = {} | |
def pick_available(self): | |
# logger.debug("Pick one from available ones.") | |
for replica in self.replicas: | |
if replica not in self.unavailable: | |
return replica | |
if time.time() > self.unavailable[replica][1]: | |
logger.info("Give a chance to %s." % replica) | |
return replica | |
logger.info("All replicas are unavailable. Give a chance to the primary.") | |
return self.replicas[0] | |
def mark_unavailable(self, replica): | |
count = self.unavailable.get(replica, (0, None))[0] + 1 | |
duration = 2 ** min(count, 10) | |
logger.info("Mark %s as unavailable. Backoff %d seconds." % (replica, duration)) | |
self.unavailable[replica] = (count, time.time() + duration) | |
def mark_available(self, replica): | |
if replica in self.unavailable: | |
logger.info("Mark %s as available." % replica) | |
del self.unavailable[replica] | |
@contextmanager | |
def client(self): | |
acquire_error = None | |
for i in xrange(self.retry): | |
# logger.debug("Trying to acquire a connection.") | |
try: | |
replica = self.pick_available() | |
conn = replica.acquire() | |
acquire_error = None | |
self.mark_available(replica) | |
# logger.debug("Acquired %s from %s." % (conn, replica)) | |
break | |
except Exception, e: | |
logger.exception("Error while acquiring a connection from %s." % replica) | |
acquire_error = e | |
self.mark_unavailable(replica) | |
if acquire_error is not None: | |
logger.error("No available replica.") | |
raise acquire_error | |
try: | |
yield conn.client | |
replica.release(conn) | |
except Thrift.TException: | |
replica.abandon(conn) | |
self.mark_unavailable(replica) | |
raise | |
except: | |
replica.release(conn) | |
raise | |
helloworld = ThriftHA([ThriftPool(HelloWorld, host, 1234, pool_conf={"max": 3}) for host in ["primary.localdomain", "fallback.localdomain"]]) | |
with helloworld.client() as client: | |
print client.hello("world") |
High Availability 혹은 Highly Available의 약자입니다.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
코드 잘보았습니다. 저도 파이썬으로 thrift로 쓰고 있었는데 위에 코드를 사용하면 좋겠네요.
그런데 HA의 뜻은 무언가요?