Created
February 4, 2020 23:01
-
-
Save odeke-em/a17aa49854aeae1d83ffc14715f52d79 to your computer and use it in GitHub Desktop.
Auto-refreshing transaction prototype to take care of Spanner 10 second idle limit https://cloud.google.com/spanner/docs/reference/rest/v1/TransactionOptions#idle-transactions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2020 Google LLC | |
# | |
# Use of this source code is governed by a BSD-style | |
# license that can be found in the LICENSE file or at | |
# https://developers.google.com/open-source/licenses/bsd | |
import threading | |
import time | |
from google.cloud import spanner_v1 as spanner | |
class AutoRefreshingTransaction(spanner.transaction.Transaction): | |
""" | |
RefreshableTransaction ensures that it gets refreshed every | |
8 seconds by sending a "SELECT 1" and retrieving the results. | |
""" | |
__MAX_TIMEOUT_SECS = 8 | |
def initialize_auto_refresh(self): | |
# This method exists because getting a Transaction from a sessionPool | |
# returns an already instantiated Transaction, thus we can't have an | |
# __init__ method. | |
self.__shared_mem = {'running': True, 'last_active_time': 0} | |
self.__lock = threading.RLock() | |
# Spawn the PingingThread. | |
pth = threading.Thread(target=periodic_ping, name='txn-freshness-ping', | |
args=(self.__MAX_TIMEOUT_SECS, self.__ping, self.__shared_mem, self.__lock,)) | |
pth.start() | |
self.__pth = pth | |
def begin(self, *args, **kwargs): | |
res = super().begin() | |
self.initialize_auto_refresh() | |
return res | |
def execute_sql(self, *args, **kwargs): | |
self.__record_as_active() | |
super().execute_sql(*args, **kwargs) | |
def execute_update(self, *args, **kwargs): | |
self.__record_as_active() | |
super().execute_update(*args, **kwargs) | |
def commit(self, *args, **kwargs): | |
self.__record_as_active() | |
res = super().commit(*args, **kwargs) | |
self.end() | |
return res | |
def rollback(self, *args, **kwargs): | |
self.__record_as_active() | |
res = super().rollback(*args, **kwargs) | |
self.end() | |
return res | |
def __record_as_active(self): | |
self.__lock.acquire() | |
self.__shared_mem['last_active_time'] = time.time() | |
self.__lock.release() | |
def end(self): | |
self.__lock.acquire() | |
self.__shared_mem['running'] = False | |
self.__lock.release() | |
self.__pth.join() | |
print('ended') | |
def __ping(self): | |
res = super().execute_sql('SELECT 1=1') | |
print('pinging') | |
for it in res: | |
_ = it | |
print(it) | |
self.__lock.acquire() | |
self.__shared_mem['last_active_time'] = time.time() | |
self.__lock.release() | |
def periodic_ping(max_timeout_secs, ping, shared_mem, lock): | |
while True: | |
lock.acquire() | |
running = shared_mem['running'] | |
lock.release() | |
if not running: | |
break | |
time.sleep(max_timeout_secs) | |
lock.acquire() | |
running = shared_mem['running'] | |
lock.release() | |
if not running: | |
break | |
diff_secs = time.time() - shared_mem['last_active_time'] | |
if diff_secs >= max_timeout_secs: | |
ping() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment