Last active
February 18, 2020 07:42
-
-
Save schcriher/e6f04c43c59625bef42613f98882af7a to your computer and use it in GitHub Desktop.
Persistence of jobs - PTB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import pickle | |
from threading import Thread, Event | |
from time import time, sleep | |
from datetime import timedelta | |
from telegram import Update | |
from telegram.ext import Updater, Filters, CommandHandler, MessageHandler, Job | |
INI = time() | |
JOBS_PICKLE = 'job_tuples.pickle' | |
JOB_DATA = ('callback', 'interval', 'repeat', 'context', 'days', 'name', 'tzinfo') | |
JOB_STATE = ('_remove', '_enabled') | |
def load_jobs(jq): | |
names = [] | |
with open(JOBS_PICKLE, 'rb') as fp: | |
while True: | |
try: | |
next_t, data, state = pickle.load(fp) | |
except EOFError: | |
break # Loaded all jobs | |
# New object with the same data | |
job = Job(**{var: val for var, val in zip(JOB_DATA, data)}) | |
# Restore the state it had | |
for var, val in zip(JOB_STATE, state): | |
attribute = getattr(job, var) | |
getattr(attribute, 'set' if val else 'clear')() | |
job.job_queue = jq | |
next_t -= time() # Convert from absolute to relative time | |
jq._put(job, next_t) | |
names.append(job.name) | |
print(f'[{time() - INI:5.2f}] load_jobs {", ".join(names)}') | |
def save_jobs(jq): | |
names = [] | |
with jq._queue.mutex: # in case job_queue makes a change | |
if jq: | |
job_tuples = jq._queue.queue | |
else: | |
job_tuples = [] | |
with open(JOBS_PICKLE, 'wb') as fp: | |
for next_t, job in job_tuples: | |
# This job is always created at the start | |
if job.name == 'save_jobs_job': | |
continue | |
data = tuple(getattr(job, var) for var in JOB_DATA) | |
state = tuple(getattr(job, var).is_set() for var in JOB_STATE) | |
sleep(0.3) # allow the collision of threads (not necessary here) | |
# Pickle the job | |
pickle.dump((next_t, data, state), fp) | |
names.append(job.name) | |
print(f'[{time() - INI:5.2f}] save_jobs {", ".join(names)}') | |
def save_jobs_job(context): | |
save_jobs(context.job_queue) | |
def message_job(context): | |
print(f'[{time() - INI:5.2f}] message_job {context.job.context}') | |
def main(): | |
print(f'[{time() - INI:5.2f}] START') | |
# ---TEST--------------------------------------------------------------------- | |
# Fake token | |
updater = Updater('123:abc', use_context=True) | |
# Patch for "get_me" and "start_webhook" | |
ret = b'{"ok":1,"result":{"id":1,"is_bot":1,"username":"b","first_name":"B"}}' | |
updater.bot._request._request_wrapper = lambda *a, **b: ret | |
updater._start_webhook = lambda *a, **b: None | |
# ---------------------------------------------------------------------------- | |
job_queue = updater.job_queue | |
# Periodically save jobs | |
job_queue.run_repeating(save_jobs_job, timedelta(seconds=3)) | |
try: | |
load_jobs(job_queue) | |
except FileNotFoundError: | |
# First run | |
pass | |
# ---TEST--------------------------------------------------------------------- | |
# timeline 0 1 2 3 4 5 6 7 8 | |
# +---+---+---+---+---+---+-|-+---+ | |
# save_jobs_job * * | | |
# message_job * * * * | * | |
# stop * | |
jobs = {} | |
for i in (2, 3.2, 4, 5, 8): | |
jobs[i] = job_queue.run_once(message_job, timedelta(seconds=i), context=i) | |
def cancel(): | |
sleep(3.1) | |
jobs[3.2].schedule_removal() | |
def stop(): | |
sleep(6.5) | |
updater.stop() | |
updater.is_idle = False # updater.stop() does not do it 😑 | |
Thread(target=cancel).start() | |
Thread(target=stop).start() | |
# ---------------------------------------------------------------------------- | |
updater.start_webhook() | |
updater.idle() | |
# Run again after bot has been properly shut down | |
save_jobs(job_queue) | |
print(f'[{time() - INI:5.2f}] END\n') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import pickle | |
from threading import Thread, Event | |
from time import time, sleep | |
from datetime import timedelta | |
from telegram import Update | |
from telegram.ext import Updater, Filters, CommandHandler, MessageHandler | |
INI = time() | |
JOBS_PICKLE = 'job_tuples.pickle' | |
def load_jobs(jq): | |
names = [] | |
now = time() | |
with open(JOBS_PICKLE, 'rb') as fp: | |
while True: | |
try: | |
next_t, job = pickle.load(fp) | |
except EOFError: | |
break # Loaded all job tuples | |
# Create threading primitives | |
enabled = job._enabled | |
removed = job._remove | |
job._enabled = Event() | |
job._remove = Event() | |
if enabled: | |
job._enabled.set() | |
if removed: | |
job._remove.set() | |
next_t -= now # Convert from absolute to relative time | |
jq._put(job, next_t) | |
names.append(job.name) | |
print(f'[{time() - INI:5.2f}] load_jobs {", ".join(names)}') | |
def save_jobs(jq): | |
names = [] | |
if jq: | |
job_tuples = jq._queue.queue | |
else: | |
job_tuples = [] | |
with open(JOBS_PICKLE, 'wb') as fp: | |
for next_t, job in job_tuples: | |
# Back up objects | |
_job_queue = job._job_queue | |
_remove = job._remove | |
_enabled = job._enabled | |
# Replace un-pickleable threading primitives | |
job._job_queue = None # Will be reset in jq.put | |
job._remove = job.removed # Convert to boolean | |
job._enabled = job.enabled # Convert to boolean | |
sleep(0.3) # allow the collision of threads | |
# Pickle the job | |
pickle.dump((next_t, job), fp) | |
# Restore objects | |
job._job_queue = _job_queue | |
job._remove = _remove | |
job._enabled = _enabled | |
names.append(job.name) | |
print(f'[{time() - INI:5.2f}] save_jobs {", ".join(names)}') | |
def save_jobs_job(context): | |
save_jobs(context.job_queue) | |
def message_job(context): | |
print(f'[{time() - INI:5.2f}] message_job {context.job.context}') | |
def main(): | |
print(f'[{time() - INI:5.2f}] START') | |
# ---TEST--------------------------------------------------------------------- | |
# Fake token | |
updater = Updater('123:abc', use_context=True) | |
# Patch for "get_me" and "start_webhook" | |
ret = b'{"ok":1,"result":{"id":1,"is_bot":1,"username":"b","first_name":"B"}}' | |
updater.bot._request._request_wrapper = lambda *a, **b: ret | |
updater._start_webhook = lambda *a, **b: None | |
# ---------------------------------------------------------------------------- | |
job_queue = updater.job_queue | |
# Periodically save jobs | |
job_queue.run_repeating(save_jobs_job, timedelta(seconds=3)) | |
try: | |
load_jobs(job_queue) | |
except FileNotFoundError: | |
# First run | |
pass | |
# ---TEST--------------------------------------------------------------------- | |
# timeline 0 1 2 3 4 5 6 7 8 | |
# +---+---+---+---+---+---+-|-+---+ | |
# save_jobs_job * * | | |
# message_job * * * * | * | |
# stop * | |
jobs = {} | |
for i in (2, 3.2, 4, 5, 8): | |
jobs[i] = job_queue.run_once(message_job, timedelta(seconds=i), context=i) | |
def cancel(): | |
sleep(3.1) | |
jobs[3.2].schedule_removal() | |
def stop(): | |
sleep(6.5) | |
updater.stop() | |
updater.is_idle = False # updater.stop() does not do it 😑 | |
Thread(target=cancel).start() | |
Thread(target=stop).start() | |
# ---------------------------------------------------------------------------- | |
updater.start_webhook() | |
updater.idle() | |
# Run again after bot has been properly shut down | |
save_jobs(job_queue) | |
print(f'[{time() - INI:5.2f}] END\n') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -o errexit | |
set -o pipefail | |
set -o nounset | |
#set -o xtrace | |
VENV=venv | |
TRACE=trace.log | |
PICKLE=job_tuples.pickle | |
if [ ! -d $VENV ];then | |
python3 -m venv $VENV | |
fi | |
source $VENV/bin/activate | |
python3 -m pip install 'python-telegram-bot>=12.0.0' &> /dev/null | |
pyfiles=( | |
jobs_orig.py | |
jobs_fix.py | |
) | |
echo > $TRACE | |
(for pyfile in ${pyfiles[@]};do | |
echo -e "\n\n$pyfile:\n" | |
rm -rf $PICKLE | |
for step in 1 2 3;do | |
python3 $pyfile | |
done | |
echo -e "\n" | |
done) 2>&1 | tee -a $TRACE | |
rm -rf $PICKLE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
jobs_orig.py: | |
Exception in thread Thread-1: | |
Traceback (most recent call last): | |
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner | |
self.run() | |
File "/usr/lib/python3.7/threading.py", line 870, in run | |
self._target(*self._args, **self._kwargs) | |
File "jobs_orig.py", line 130, in cancel | |
jobs[3.2].schedule_removal() | |
File "/home/cristian/WORK/3/venv/lib/python3.7/site-packages/telegram/ext/jobqueue.py", line 437, in schedule_removal | |
self._remove.set() | |
AttributeError: 'bool' object has no attribute 'set' | |
[ 0.00] START | |
[ 2.00] message_job 2 | |
[ 4.20] save_jobs message_job, message_job, message_job, message_job | |
[ 4.20] message_job 3.2 | |
[ 4.20] message_job 4 | |
[ 5.00] message_job 5 | |
[ 6.30] save_jobs message_job | |
[ 8.61] save_jobs message_job, save_jobs_job | |
[ 8.61] END | |
[ 0.00] START | |
[ 0.00] load_jobs message_job, save_jobs_job | |
[ 0.00] message_job 8 | |
[ 1.97] save_jobs message_job, save_jobs_job, message_job, message_job, message_job, message_job | |
[ 2.00] message_job 2 | |
[ 4.50] save_jobs save_jobs_job, message_job, message_job, message_job, message_job | |
[ 6.01] save_jobs message_job, message_job, save_jobs_job, message_job, message_job | |
[ 6.01] message_job 4 | |
[ 6.01] message_job 5 | |
[ 6.61] save_jobs save_jobs_job, message_job | |
[ 7.91] save_jobs save_jobs_job, save_jobs_job, message_job | |
[ 7.91] END | |
[ 0.00] START | |
[ 0.00] load_jobs save_jobs_job, save_jobs_job, message_job | |
[ 2.41] save_jobs message_job, message_job, save_jobs_job, save_jobs_job, message_job, message_job, message_job, message_job | |
[ 2.41] message_job 8 | |
[ 4.51] save_jobs save_jobs_job, message_job, save_jobs_job, message_job, message_job, message_job, message_job | |
[ 6.61] save_jobs message_job, save_jobs_job, save_jobs_job, message_job, message_job, message_job, message_job | |
[ 6.61] message_job 2 | |
[10.11] save_jobs save_jobs_job, save_jobs_job, message_job, save_jobs_job, message_job, message_job, message_job | |
[10.11] END | |
jobs_fix.py: | |
[ 0.00] START | |
[ 2.00] message_job 2 | |
[ 4.20] save_jobs message_job, message_job, message_job, message_job | |
[ 4.20] message_job 4 | |
[ 5.00] message_job 5 | |
[ 6.30] save_jobs message_job | |
[ 7.31] save_jobs message_job | |
[ 7.31] END | |
[ 0.00] START | |
[ 0.00] load_jobs message_job | |
[ 0.48] message_job 8 | |
[ 2.00] message_job 2 | |
[ 4.20] save_jobs message_job, message_job, message_job, message_job | |
[ 4.20] message_job 4 | |
[ 5.00] message_job 5 | |
[ 6.30] save_jobs message_job | |
[ 7.31] save_jobs message_job | |
[ 7.31] END | |
[ 0.00] START | |
[ 0.00] load_jobs message_job | |
[ 0.47] message_job 8 | |
[ 2.00] message_job 2 | |
[ 4.20] save_jobs message_job, message_job, message_job, message_job | |
[ 4.20] message_job 4 | |
[ 5.00] message_job 5 | |
[ 6.30] save_jobs message_job | |
[ 8.31] save_jobs message_job | |
[ 8.31] END | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment