Skip to content

Instantly share code, notes, and snippets.

@gsw945
Created April 2, 2019 04:18
Show Gist options
  • Save gsw945/15cbb71eaca5be66787a2c187414e36f to your computer and use it in GitHub Desktop.
Save gsw945/15cbb71eaca5be66787a2c187414e36f to your computer and use it in GitHub Desktop.
apscheduler rpyc server and client demos
from time import sleep
import rpyc
conn = rpyc.connect('localhost', 12345)
job = conn.root.api_test_job(args=['Hello, World'])
sleep(2)
conn.root.remove_job(job.id)
import logging
import time
from datetime import datetime, timedelta
import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.helpers import classpartial
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import (
ThreadPoolExecutor,
ProcessPoolExecutor
)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import (
EVENT_JOB_EXECUTED,
EVENT_JOB_ERROR,
EVENT_JOB_ADDED,
EVENT_JOB_SUBMITTED,
EVENT_JOB_REMOVED
)
class SchedulerService(rpyc.Service):
def __init__(self, args=None, kwargs=None):
print(args, kwargs)
def on_connect(self, conn):
print('rpyc connect')
pass
def on_disconnect(self, conn):
print('rpyc disconnect')
pass
def exposed_add_job(self, func, *args, **kwargs):
return scheduler.add_job(func, *args, **kwargs)
def exposed_modify_job(self, job_id, jobstore=None, **changes):
return scheduler.modify_job(job_id, jobstore, **changes)
def exposed_reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
return scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args)
def exposed_pause_job(self, job_id, jobstore=None):
return scheduler.pause_job(job_id, jobstore)
def exposed_resume_job(self, job_id, jobstore=None):
return scheduler.resume_job(job_id, jobstore)
def exposed_remove_job(self, job_id, jobstore=None):
scheduler.remove_job(job_id, jobstore)
def exposed_get_job(self, job_id):
return scheduler.get_job(job_id)
def exposed_get_jobs(self, jobstore=None):
return scheduler.get_jobs(jobstore)
def exposed_api_test_job(self, args=None):
print(args)
date_time = datetime.now() + timedelta(seconds=5)
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
print(now_str)
return scheduler.add_job(
print,
trigger='date',
next_run_time=date_time,
args=[now_str.center(50, '~')]
)
def event_listener(event, scheduler):
code = getattr(event, 'code')
job_id = getattr(event, 'job_id')
jobstore = getattr(event, 'jobstore')
event_codes = {
EVENT_JOB_EXECUTED: 'JOB_EXECUTED',
EVENT_JOB_ERROR: 'JOB_ERROR',
EVENT_JOB_ADDED: 'JOB_ADDED',
EVENT_JOB_SUBMITTED: 'JOB_SUBMITTED',
EVENT_JOB_REMOVED: 'JOB_REMOVED'
}
event_name = None
job = None
need_record = False
if code in event_codes:
need_record = True
event_name = event_codes[code]
job = scheduler.get_job(job_id)
if code == EVENT_JOB_SUBMITTED:
time.sleep(0.05)
elif code in [EVENT_JOB_EXECUTED, EVENT_JOB_ERROR]:
# 执行完成和出错不可能同时出现,故延时一样
time.sleep(0.1)
print('*' * 80)
print(job_id)
print(event_name, datetime.now())
# print(job)
'''
if hasattr(event, 'exception') and event.exception:
error, *_ = event.exception.args
tmp = re.sub(r'\([\w.]+\)\s*ORA-\d+:\s*', '', error)
if bool(tmp):
error = tmp
# TODO: log error
'''
def modify_logger(logger, log_file=None):
# refer: https://docs.python.org/3.5/library/logging.html#logrecord-attributes
formatter = logging.Formatter(
fmt='\n'.join([
'[%(name)s] %(asctime)s.%(msecs)d',
'\t%(pathname)s [line: %(lineno)d]',
'\t%(processName)s[%(process)d] => %(threadName)s[%(thread)d] => %(module)s.%(filename)s:%(funcName)s()',
'\t%(levelname)s: %(message)s\n'
]),
datefmt='%Y-%m-%d %H:%M:%S'
)
if log_file is None:
log_file = 'logger.log'
# stream_handler = logging.StreamHandler()
# stream_handler.setFormatter(formatter)
# logger.addHandler(stream_handler)
file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
return logger
if __name__ == '__main__':
scheduler = BackgroundScheduler({'apscheduler.timezone': 'Asia/Shanghai'})
_path = r'jobstore.sqlite'
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///{0}'.format(_path))
}
executors = {
'default': ThreadPoolExecutor(40),
'processpool': ProcessPoolExecutor(8)
}
scheduler.configure(jobstores=jobstores, executors=executors)
# 事件记录
scheduler.add_listener(
lambda event: event_listener(event, scheduler),
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_SUBMITTED | EVENT_JOB_REMOVED
)
# 日志定制
scheduler._logger = modify_logger(scheduler._logger)
# 启动调度
scheduler.start()
# 额外构造函数参数
ser_args = ['test args']
ser_kwargs = {}
# 传递Service构造函数参数
service = classpartial(SchedulerService, *ser_args, **ser_kwargs)
# 允许属性访问
protocol_config = {'allow_public_attrs': True}
# 监听配置
listen_config = {
'port': 12345,
'hostname': '0.0.0.0'
}
# 实例化RPYC服务器
server = ThreadedServer(service, protocol_config=protocol_config, **listen_config)
print('rpyc server running at [{hostname}:{port}]'.format(**listen_config))
try:
# 启动RPYC服务器
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
# 停止调度
scheduler.shutdown()
# 停止RPYC服务器
server.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment