Skip to content

Instantly share code, notes, and snippets.

@schcriher
Last active February 18, 2020 07:42
Show Gist options
  • Save schcriher/e6f04c43c59625bef42613f98882af7a to your computer and use it in GitHub Desktop.
Save schcriher/e6f04c43c59625bef42613f98882af7a to your computer and use it in GitHub Desktop.
Persistence of jobs - PTB
import os
import sys
import pickle
from threading import Thread, Event
from time import time, sleep
from datetime import timedelta
from telegram import Update
from telegram.ext import Updater, Filters, CommandHandler, MessageHandler, Job
INI = time()
JOBS_PICKLE = 'job_tuples.pickle'
JOB_DATA = ('callback', 'interval', 'repeat', 'context', 'days', 'name', 'tzinfo')
JOB_STATE = ('_remove', '_enabled')
def load_jobs(jq):
names = []
with open(JOBS_PICKLE, 'rb') as fp:
while True:
try:
next_t, data, state = pickle.load(fp)
except EOFError:
break # Loaded all jobs
# New object with the same data
job = Job(**{var: val for var, val in zip(JOB_DATA, data)})
# Restore the state it had
for var, val in zip(JOB_STATE, state):
attribute = getattr(job, var)
getattr(attribute, 'set' if val else 'clear')()
job.job_queue = jq
next_t -= time() # Convert from absolute to relative time
jq._put(job, next_t)
names.append(job.name)
print(f'[{time() - INI:5.2f}] load_jobs {", ".join(names)}')
def save_jobs(jq):
names = []
with jq._queue.mutex: # in case job_queue makes a change
if jq:
job_tuples = jq._queue.queue
else:
job_tuples = []
with open(JOBS_PICKLE, 'wb') as fp:
for next_t, job in job_tuples:
# This job is always created at the start
if job.name == 'save_jobs_job':
continue
data = tuple(getattr(job, var) for var in JOB_DATA)
state = tuple(getattr(job, var).is_set() for var in JOB_STATE)
sleep(0.3) # allow the collision of threads (not necessary here)
# Pickle the job
pickle.dump((next_t, data, state), fp)
names.append(job.name)
print(f'[{time() - INI:5.2f}] save_jobs {", ".join(names)}')
def save_jobs_job(context):
save_jobs(context.job_queue)
def message_job(context):
print(f'[{time() - INI:5.2f}] message_job {context.job.context}')
def main():
print(f'[{time() - INI:5.2f}] START')
# ---TEST---------------------------------------------------------------------
# Fake token
updater = Updater('123:abc', use_context=True)
# Patch for "get_me" and "start_webhook"
ret = b'{"ok":1,"result":{"id":1,"is_bot":1,"username":"b","first_name":"B"}}'
updater.bot._request._request_wrapper = lambda *a, **b: ret
updater._start_webhook = lambda *a, **b: None
# ----------------------------------------------------------------------------
job_queue = updater.job_queue
# Periodically save jobs
job_queue.run_repeating(save_jobs_job, timedelta(seconds=3))
try:
load_jobs(job_queue)
except FileNotFoundError:
# First run
pass
# ---TEST---------------------------------------------------------------------
# timeline 0 1 2 3 4 5 6 7 8
# +---+---+---+---+---+---+-|-+---+
# save_jobs_job * * |
# message_job * * * * | *
# stop *
jobs = {}
for i in (2, 3.2, 4, 5, 8):
jobs[i] = job_queue.run_once(message_job, timedelta(seconds=i), context=i)
def cancel():
sleep(3.1)
jobs[3.2].schedule_removal()
def stop():
sleep(6.5)
updater.stop()
updater.is_idle = False # updater.stop() does not do it 😑
Thread(target=cancel).start()
Thread(target=stop).start()
# ----------------------------------------------------------------------------
updater.start_webhook()
updater.idle()
# Run again after bot has been properly shut down
save_jobs(job_queue)
print(f'[{time() - INI:5.2f}] END\n')
if __name__ == '__main__':
main()
import os
import sys
import pickle
from threading import Thread, Event
from time import time, sleep
from datetime import timedelta
from telegram import Update
from telegram.ext import Updater, Filters, CommandHandler, MessageHandler
INI = time()
JOBS_PICKLE = 'job_tuples.pickle'
def load_jobs(jq):
names = []
now = time()
with open(JOBS_PICKLE, 'rb') as fp:
while True:
try:
next_t, job = pickle.load(fp)
except EOFError:
break # Loaded all job tuples
# Create threading primitives
enabled = job._enabled
removed = job._remove
job._enabled = Event()
job._remove = Event()
if enabled:
job._enabled.set()
if removed:
job._remove.set()
next_t -= now # Convert from absolute to relative time
jq._put(job, next_t)
names.append(job.name)
print(f'[{time() - INI:5.2f}] load_jobs {", ".join(names)}')
def save_jobs(jq):
names = []
if jq:
job_tuples = jq._queue.queue
else:
job_tuples = []
with open(JOBS_PICKLE, 'wb') as fp:
for next_t, job in job_tuples:
# Back up objects
_job_queue = job._job_queue
_remove = job._remove
_enabled = job._enabled
# Replace un-pickleable threading primitives
job._job_queue = None # Will be reset in jq.put
job._remove = job.removed # Convert to boolean
job._enabled = job.enabled # Convert to boolean
sleep(0.3) # allow the collision of threads
# Pickle the job
pickle.dump((next_t, job), fp)
# Restore objects
job._job_queue = _job_queue
job._remove = _remove
job._enabled = _enabled
names.append(job.name)
print(f'[{time() - INI:5.2f}] save_jobs {", ".join(names)}')
def save_jobs_job(context):
save_jobs(context.job_queue)
def message_job(context):
print(f'[{time() - INI:5.2f}] message_job {context.job.context}')
def main():
print(f'[{time() - INI:5.2f}] START')
# ---TEST---------------------------------------------------------------------
# Fake token
updater = Updater('123:abc', use_context=True)
# Patch for "get_me" and "start_webhook"
ret = b'{"ok":1,"result":{"id":1,"is_bot":1,"username":"b","first_name":"B"}}'
updater.bot._request._request_wrapper = lambda *a, **b: ret
updater._start_webhook = lambda *a, **b: None
# ----------------------------------------------------------------------------
job_queue = updater.job_queue
# Periodically save jobs
job_queue.run_repeating(save_jobs_job, timedelta(seconds=3))
try:
load_jobs(job_queue)
except FileNotFoundError:
# First run
pass
# ---TEST---------------------------------------------------------------------
# timeline 0 1 2 3 4 5 6 7 8
# +---+---+---+---+---+---+-|-+---+
# save_jobs_job * * |
# message_job * * * * | *
# stop *
jobs = {}
for i in (2, 3.2, 4, 5, 8):
jobs[i] = job_queue.run_once(message_job, timedelta(seconds=i), context=i)
def cancel():
sleep(3.1)
jobs[3.2].schedule_removal()
def stop():
sleep(6.5)
updater.stop()
updater.is_idle = False # updater.stop() does not do it 😑
Thread(target=cancel).start()
Thread(target=stop).start()
# ----------------------------------------------------------------------------
updater.start_webhook()
updater.idle()
# Run again after bot has been properly shut down
save_jobs(job_queue)
print(f'[{time() - INI:5.2f}] END\n')
if __name__ == '__main__':
main()
#!/usr/bin/env bash
set -o errexit
set -o pipefail
set -o nounset
#set -o xtrace
VENV=venv
TRACE=trace.log
PICKLE=job_tuples.pickle
if [ ! -d $VENV ];then
python3 -m venv $VENV
fi
source $VENV/bin/activate
python3 -m pip install 'python-telegram-bot>=12.0.0' &> /dev/null
pyfiles=(
jobs_orig.py
jobs_fix.py
)
echo > $TRACE
(for pyfile in ${pyfiles[@]};do
echo -e "\n\n$pyfile:\n"
rm -rf $PICKLE
for step in 1 2 3;do
python3 $pyfile
done
echo -e "\n"
done) 2>&1 | tee -a $TRACE
rm -rf $PICKLE
jobs_orig.py:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "jobs_orig.py", line 130, in cancel
jobs[3.2].schedule_removal()
File "/home/cristian/WORK/3/venv/lib/python3.7/site-packages/telegram/ext/jobqueue.py", line 437, in schedule_removal
self._remove.set()
AttributeError: 'bool' object has no attribute 'set'
[ 0.00] START
[ 2.00] message_job 2
[ 4.20] save_jobs message_job, message_job, message_job, message_job
[ 4.20] message_job 3.2
[ 4.20] message_job 4
[ 5.00] message_job 5
[ 6.30] save_jobs message_job
[ 8.61] save_jobs message_job, save_jobs_job
[ 8.61] END
[ 0.00] START
[ 0.00] load_jobs message_job, save_jobs_job
[ 0.00] message_job 8
[ 1.97] save_jobs message_job, save_jobs_job, message_job, message_job, message_job, message_job
[ 2.00] message_job 2
[ 4.50] save_jobs save_jobs_job, message_job, message_job, message_job, message_job
[ 6.01] save_jobs message_job, message_job, save_jobs_job, message_job, message_job
[ 6.01] message_job 4
[ 6.01] message_job 5
[ 6.61] save_jobs save_jobs_job, message_job
[ 7.91] save_jobs save_jobs_job, save_jobs_job, message_job
[ 7.91] END
[ 0.00] START
[ 0.00] load_jobs save_jobs_job, save_jobs_job, message_job
[ 2.41] save_jobs message_job, message_job, save_jobs_job, save_jobs_job, message_job, message_job, message_job, message_job
[ 2.41] message_job 8
[ 4.51] save_jobs save_jobs_job, message_job, save_jobs_job, message_job, message_job, message_job, message_job
[ 6.61] save_jobs message_job, save_jobs_job, save_jobs_job, message_job, message_job, message_job, message_job
[ 6.61] message_job 2
[10.11] save_jobs save_jobs_job, save_jobs_job, message_job, save_jobs_job, message_job, message_job, message_job
[10.11] END
jobs_fix.py:
[ 0.00] START
[ 2.00] message_job 2
[ 4.20] save_jobs message_job, message_job, message_job, message_job
[ 4.20] message_job 4
[ 5.00] message_job 5
[ 6.30] save_jobs message_job
[ 7.31] save_jobs message_job
[ 7.31] END
[ 0.00] START
[ 0.00] load_jobs message_job
[ 0.48] message_job 8
[ 2.00] message_job 2
[ 4.20] save_jobs message_job, message_job, message_job, message_job
[ 4.20] message_job 4
[ 5.00] message_job 5
[ 6.30] save_jobs message_job
[ 7.31] save_jobs message_job
[ 7.31] END
[ 0.00] START
[ 0.00] load_jobs message_job
[ 0.47] message_job 8
[ 2.00] message_job 2
[ 4.20] save_jobs message_job, message_job, message_job, message_job
[ 4.20] message_job 4
[ 5.00] message_job 5
[ 6.30] save_jobs message_job
[ 8.31] save_jobs message_job
[ 8.31] END
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment