Skip to content

Instantly share code, notes, and snippets.

@fzls
Last active June 3, 2024 09:49
Show Gist options
  • Save fzls/6029a5d28c37e7255cd99acf1f402177 to your computer and use it in GitHub Desktop.
Save fzls/6029a5d28c37e7255cd99acf1f402177 to your computer and use it in GitHub Desktop.
分享一个青龙面板的简单并发控制插件

声明

本方案首先是自用,然后才是顺带分享给大家,所以我爱咋写咋写,请勿过多指责。没有强迫任何人使用。

如介意示例方案中的日志中的署名,请不要原封不动的使用本脚本。

正文

现状

目前青龙面板的 task 命令自带一个并发参数,如下

echo -e "3.$cmd_task <file_name> conc <环境变量名称> <账号编号,空格分隔>(可选的)  # 并发执行,无论是否设置了随机延迟,均立即运行,前台不产生日志,直接记录在日志文件中,且可指定账号执行"

但是这个自带的方案有一个很大的瑕疵,他是按照传入的ck数目来同时启动对应数目的 job 来后台执行,每个 job 会启动一个 nodejs 实例,大约30MB,当ck数目比较多时,瞬间炸裂。

    local array=($(echo $envs | sed 's/&/ /g'))
    ...
    for i in "${!array[@]}"; do
        ...
        timeout -k 10s $command_timeout_time $which_program $first_param &>$single_log_path &
    done

事实上,我们并不需要同时异步执行每一个账号。只要能做到将ck分为若干组,然后分别异步执行就可以了。

思路分析

如果精通shell,那么直接改这个conc子命令就可以解决问题了,不巧的是,我基本不动shell<_<

仔细看task的使用说明,我们会注意到有另一个子命令 desi,可以指定对应列表内的账号来运行。

echo -e "4.$cmd_task <file_name> desi <环境变量名称> <账号编号,空格分隔>         # 指定账号执行,无论是否设置了随机延迟,均立即运行"

这时候,很容易想到的一个思路是,我们可以利用这个子功能,在调用它之前先将ck分组,然后拼接成若干个该子命令,也就可以实现我们的目的了。

如果活动的账号彼此无关,到这一步,其实我们的方案已经结束了。但是如果某些活动有互助流程,这时候就要根据脚本内互助的实现方式来分两种情况了。

如果脚本内是实时查询互助,那么按我们这种外部修改的方式,只能实现组内互助,各组之间会互不相关。如果非常希望实现与未分组前一样的内部互助效果,这时候只能大改对应活动脚本,在脚本内部去实现异步执行不同账号的流程,改动的工作量将大幅提升。有时间且有兴趣的朋友可以自行去尝试。

如果脚本是通过code.sh等方式预先生成了各个ck应当助力的对象,那么我们可以通过在生成子命令的时候,预先传入实际的账号范围信息,然后在nodejs某处(比如之前魔改的jdCookie.js)去获取互助码列表的时候,先把互助码列表转换为实际的范围,这样就能实现与未分组前一样的互助效果了。

最终方案

当然,这个功能有很多不同的实现方式。只是在我需要用这个功能来并发执行部分功能的时候,并没有看到能满足我以下需求的现成的工具,所以就自己倒腾了下。

  1. 可以通过便捷的方式使特定脚本开启该功能
  2. 可以全局指定并发数
  3. 可以便捷的指定特定脚本的并发数

最终有了下面这个 ptask 方案。

用法: 直接将本脚本路径附加到原有命令行之前
这样做的目的是,确保青龙的 runSingle 函数中不会重新给它加上task前缀,而导致使用task来实际触发本脚本

使用环境变量 PTASK_THREADS 定义并发数
ptask.py task /ql/config/ptask/demo.js

某个脚本单独设置并发数(比环境变量优先级高),比如下面表示单独设置并发数为2
ptask.py -t2 task /ql/config/ptask/demo.js

使用了一周,目前感觉还行,因此分享出来,大家有需求的话可以自行取用

image

image

# 风之凌殇
# 1. 并发
export PTASK_THREADS="3"
/*
================================================================================
魔改自 https://github.com/shufflewzc/faker2/blob/main/jdCookie.js
修改内容:与task_before.sh配合,由task_before.sh设置要设置要做互助的活动的 ShareCodeConfigName 和 ShareCodeEnvName 环境变量,
然后在这里实际解析/ql/log/.ShareCode中该活动对应的配置信息(由code.sh生成和维护),注入到nodejs的环境变量中
修改原因:原先的task_before.sh直接将互助信息注入到shell的env中,在ck超过45以上时,互助码环境变量过大会导致调用一些系统命令
(如date/cat)时报 Argument list too long,而在node中修改环境变量不会受这个限制,也不会影响外部shell环境,确保脚本可以正常运行
魔改作者:风之凌殇
================================================================================
此文件为Node.js专用。其他用户请忽略
*/
//此处填写京东账号cookie。
let CookieJDs = [
]
// 判断环境变量里面是否有京东ck
if (process.env.JD_COOKIE) {
if (process.env.JD_COOKIE.indexOf('&') > -1) {
CookieJDs = process.env.JD_COOKIE.split('&');
} else if (process.env.JD_COOKIE.indexOf('\n') > -1) {
CookieJDs = process.env.JD_COOKIE.split('\n');
} else {
CookieJDs = [process.env.JD_COOKIE];
}
}
if (JSON.stringify(process.env).indexOf('GITHUB')>-1) {
console.log(`请勿使用github action运行此脚本,无论你是从你自己的私库还是其他哪里拉取的源代码,都会导致我被封号\n`);
!(async () => {
await require('./sendNotify').sendNotify('提醒', `请勿使用github action、滥用github资源会封我仓库以及账号`)
await process.exit(0);
})()
}
CookieJDs = [...new Set(CookieJDs.filter(item => !!item))]
console.log(`\n====================共${CookieJDs.length}个京东账号Cookie=========\n`);
console.log(`==================脚本执行- 北京时间(UTC+8):${new Date(new Date().getTime() + new Date().getTimezoneOffset()*60*1000 + 8*60*60*1000).toLocaleString('zh', {hour12: false}).replace(' 24:',' 00:')}=====================\n`)
if (process.env.JD_DEBUG && process.env.JD_DEBUG === 'false') console.log = () => {};
for (let i = 0; i < CookieJDs.length; i++) {
if (!CookieJDs[i].match(/pt_pin=(.+?);/) || !CookieJDs[i].match(/pt_key=(.+?);/)) console.log(`\n提示:京东cookie 【${CookieJDs[i]}】填写不规范,可能会影响部分脚本正常使用。正确格式为: pt_key=xxx;pt_pin=xxx;(分号;不可少)\n`);
const index = (i + 1 === 1) ? '' : (i + 1);
exports['CookieJD' + index] = CookieJDs[i].trim();
}
// 以下为注入互助码环境变量(仅nodejs内起效)的代码
function SetShareCodesEnv(nameChinese = "", nameConfig = "", envName = "") {
let rawCodeConfig = {}
// 读取互助码
let shareCodeLogPath = `${process.env.QL_DIR}/log/.ShareCode/${nameConfig}.log`
let fs = require('fs')
if (fs.existsSync(shareCodeLogPath)) {
// 因为faker2目前没有自带ini,改用已有的dotenv来解析
// // 利用ini模块读取原始互助码和互助组信息
// let ini = require('ini')
// rawCodeConfig = ini.parse(fs.readFileSync(shareCodeLogPath, 'utf-8'))
// 使用env模块
require('dotenv').config({path: shareCodeLogPath})
rawCodeConfig = process.env
}
// 解析每个用户的互助码
let codes = {}
Object.keys(rawCodeConfig).forEach(function (key) {
if (key.startsWith(`My${nameConfig}`)) {
codes[key] = rawCodeConfig[key]
}
});
// 解析每个用户要帮助的互助码组,将用户实际的互助码填充进去
let helpOtherCodes = {}
Object.keys(rawCodeConfig).forEach(function (key) {
if (key.startsWith(`ForOther${nameConfig}`)) {
let helpCode = rawCodeConfig[key]
for (const [codeEnv, codeVal] of Object.entries(codes)) {
helpCode = helpCode.replace("${" + codeEnv + "}", codeVal)
}
helpOtherCodes[key] = helpCode
}
});
// 按顺序用&拼凑到一起,并放入环境变量,供目标脚本使用
let shareCodes = []
let leftIndex = 1, rightIndex = Object.keys(helpOtherCodes).length
// 判断是否是ptask并行触发,若是,则修改实际需要设置的互助码范围
let ptaskLeft = process.env.PTASK_LEFT
let ptaskRight = process.env.PTASK_RIGHT
if (ptaskLeft && ptaskRight) {
leftIndex = Number(ptaskLeft)
rightIndex = Number(ptaskRight)
}
for (let idx = leftIndex; idx <= rightIndex; idx++) {
shareCodes.push(helpOtherCodes[`ForOther${nameConfig}${idx}`])
}
let shareCodesStr = shareCodes.join('&')
process.env[envName] = shareCodesStr
let totalCodeCount = rightIndex - leftIndex + 1
console.info(`【风之凌殇】 友情提示:设置 ${nameChinese} 的 互助码环境变量 ${envName},共计 ${totalCodeCount} 组互助码,总大小为 ${shareCodesStr.length}`)
}
// 判断当前活动脚本是否在互助脚本列表中
function IsShareJsFile() {
// 尝试获取在task_before.sh中设置的 互助活动的脚本文件名的关键部分 列表
let rawJsNameList = process.env.ShareCodeJSNameList
if (!rawJsNameList) {
return false
}
// 转换为list
let jsNameList = process.env.ShareCodeJSNameList.split(" ")
// 判断当前
let currentActivityScriptFileName = GetCurrentActivityScriptFileName()
let isShareJsFile = false
for (let idx = 0; idx < jsNameList.length; idx++) {
if (currentActivityScriptFileName.includes(jsNameList[idx])) {
isShareJsFile = true
break
}
}
return isShareJsFile
}
// 获取当前活动脚本的文件名
function GetCurrentActivityScriptFileName() {
const path = require('path')
return path.basename(process.argv[1])
}
function randomString(e) {
e = e || 32;
let t = "abcdefhijkmnprstwxyz2345678",
a = t.length,
n = "";
for (i = 0; i < e; i++)
n += t.charAt(Math.floor(Math.random() * a));
return n
}
// 若在task_before.sh 中设置了要设置互助码环境变量的活动名称和环境变量名称信息,则在nodejs中处理,供活动使用
let nameChinese = process.env.ShareCodeConfigChineseName
let nameConfig = process.env.ShareCodeConfigName
let envName = process.env.ShareCodeEnvName
if (nameChinese && nameConfig && envName) {
SetShareCodesEnv(nameChinese, nameConfig, envName)
} else if (IsShareJsFile()) {
console.debug(`【风之凌殇】 友情提示:当前脚本为 ${GetCurrentActivityScriptFileName()},包含在互助脚本列表中,但未设置 ShareCodeConfigName 或 ShareCodeEnvName 环境变量,将不会尝试在nodejs中生成互助码的环境变量。ps: 两个值目前分别为 ${nameConfig} ${envName}`)
console.debug(`看不惯上面的署名,就自己把这行注释掉<_< 或者不用`)
}
// 增加打印队伍信息
// export JD_COOKIE_TEAM_INDEX=$j
// export JD_COOKIE_TEAM_START_INDEX=$start_index
// export JD_COOKIE_TEAM_END_INDEX=$end_index
let teamIndex = process.env.JD_COOKIE_TEAM_INDEX
let teamStartIndex = process.env.JD_COOKIE_TEAM_START_INDEX
let teamEndIndex = process.env.JD_COOKIE_TEAM_END_INDEX
if (teamIndex && teamStartIndex && teamEndIndex) {
console.info(`当前是组队模式,队伍序号为 ${teamIndex},cookie范围为 [${teamStartIndex}, ${teamEndIndex}]`)
}
// 打印ptask并行调用的一些信息
if (process.env.PTASK) {
console.info(`检查到ptask并行执行(Powered By 风之凌殇),并发数为 ${process.env.PTASK_FINAL_THREADS},每个脚本处理 ${process.env.PTASK_BATCH} 个账号,全局账号数总计 ${process.env.PTASK_TOTAL}`)
console.info(`当前脚本实例 ${randomString(4)} 处理账号范围 [${process.env.PTASK_LEFT}, ${process.env.PTASK_RIGHT}]`)
}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# 用法: 直接将本脚本路径附加到原有命令行之前
# 这样做的目的是,确保青龙的 runSingle 函数中不会重新给它加上task前缀,而导致使用task来实际触发本脚本
#
# 使用环境变量 PTASK_THREADS 定义并发数
# ptask.py task /ql/config/ptask/demo.js
#
# 某个脚本单独设置并发数(比环境变量优先级高),比如下面表示单独设置并发数为2
# ptask.py -t2 task /ql/config/ptask/demo.js
#
#
import argparse
import datetime
import logging
import os
import subprocess
import sys
from typing import Optional
envs = {}
logger = None # type: Optional[logging.Logger]
def parallel_task(target_script: str, threads: int, jd_cookie_count: int):
account_per_thread = (jd_cookie_count + threads - 1) // threads
logger.debug(f"并发执行脚本, Powered By 风之凌殇")
logger.debug(f"使用 {threads} 并发")
logger.debug(f"共有 {jd_cookie_count} 个京东cookie")
logger.debug(f"每个并发任务处理 {account_per_thread} 个账号")
# 构建并发任务的cmd
# 比如下面是 使用三个进程并行执行13个账号的实际子任务命令列表
# PTASK_LEFT=1 PTASK_RIGHT=5 task /ql/config/ptask/demo.py desi JD_COOKIE 1 2 3 4 5
# PTASK_LEFT=6 PTASK_RIGHT=10 task /ql/config/ptask/demo.py desi JD_COOKIE 6 7 8 9 10
# PTASK_LEFT=11 PTASK_RIGHT=13 task /ql/config/ptask/demo.py desi JD_COOKIE 11 12 13
cmd_list = []
for idx in range(threads):
left = idx * account_per_thread + 1
right = min((idx + 1) * account_per_thread, jd_cookie_count)
batch_accounts = ' '.join(map(str, range(left, right + 1)))
cmd_list.append(f"PTASK=true PTASK_FINAL_THREADS={threads} PTASK_BATCH={account_per_thread} PTASK_TOTAL={jd_cookie_count} PTASK_LEFT={left} PTASK_RIGHT={right} task {target_script} desi JD_COOKIE {batch_accounts}")
logger.debug("将要执行以下任务")
for cmd in cmd_list:
logger.debug(f"\t{cmd}")
# 确保在执行前全部输出
logger.debug("")
task_list = [subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) for cmd in cmd_list]
for task in task_list:
task.wait()
# logger.debug(task.stdout.read())
logger.debug("全部任务执行完毕")
def parse_args():
logger.debug(f"sys.argv={sys.argv}")
parse_envs()
parser = argparse.ArgumentParser()
parser.add_argument("task", type=str, help="只能填写task", choices=["task"])
parser.add_argument("target_script", type=str, help="实际要运行的脚本路径")
parser.add_argument("now", nargs="?", type=str, help="是否立即执行")
parser.add_argument("-t", "--threads", type=int, help="并发数", default=int(get_env('PTASK_THREADS', '3')))
parser.add_argument("--jd_cookie_count", type=int, help="京东cookie数量", default=int(get_env('JD_COOKIE', '').count('&') + 1))
args = parser.parse_args()
logger.debug(f"args={args}")
return args
def parse_envs():
# 加载 /ql/config/config.sh 和 /ql/config/env.sh 中定义的环境变量
global envs
for shell_file in [
'/ql/config/config.sh', '/ql/config/env.sh',
'../config.sh', '../env.sh',
]:
if not os.path.exists(shell_file):
continue
env_prefix = "export "
with open(shell_file, 'r', encoding='utf-8') as f:
for line in f.readlines():
if not line.startswith(env_prefix):
continue
line = line.replace(env_prefix, "")
key, value = line.split('=', maxsplit=1)
start_idx = value.find('"')
end_idx = value.rfind('"')
envs[key] = value[start_idx + 1:end_idx]
# for k, v in envs.items():
# print(f"{k}={v}")
def get_env(key: str, default="") -> str:
return envs.get(key, default)
def init_logger():
global logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.name = "ptask"
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(logging.INFO)
logger.addHandler(consoleHandler)
log_directory = "./logs"
if os.path.exists("/ql/log"):
log_directory = "/ql/log/ptask"
if not os.path.exists(log_directory):
os.makedirs(log_directory, exist_ok=True)
time_str = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
log_filename = f"{log_directory}/{time_str}.log"
fileHandler = logging.FileHandler(log_filename, encoding="utf-8", delay=True)
fileHandler.setLevel(logging.DEBUG)
fileHandler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(fileHandler)
def main():
init_logger()
args = parse_args()
parallel_task(args.target_script, args.threads, args.jd_cookie_count)
if __name__ == '__main__':
main()
@fzls
Copy link
Author

fzls commented Dec 5, 2021

@1135393 把下面的ptask.py改成你的脚本放在容器里的绝对路径,比如/ql/config/ptask.py

用法: 直接将本脚本路径附加到原有命令行之前
这样做的目的是,确保青龙的 runSingle 函数中不会重新给它加上task前缀,而导致使用task来实际触发本脚本

使用环境变量 PTASK_THREADS 定义并发数
ptask.py task /ql/config/ptask/demo.js

某个脚本单独设置并发数(比环境变量优先级高),比如下面表示单独设置并发数为2
ptask.py -t2 task /ql/config/ptask/demo.js

@1135393
Copy link

1135393 commented Dec 5, 2021

谢谢大佬, 我放在/ql/config/目录下了, 然后在配置文件-config,添加export PTASK_THREADS="3" 就OK了吧

@zz112211
Copy link

zz112211 commented Dec 5, 2021

2.10.2的青龙好像不能用...

@fzls
Copy link
Author

fzls commented Dec 6, 2021

@zz112211 青龙2.10.10之前的版本请自己修改部分流程,没精力针对每个版本都给出完整的使用流程。旧版本据我说知,应该只有以下区别

  1. 所有脚本在同一目录中
  2. 不支持通过deps自动分发依赖到各个脚本目录中

自行根据这些差异调整即可

@1491646020
Copy link

目前好像只知道这几步,加变量,把ptask.py移到config,更改jdcookie文件,还有其他步骤吗?

@1491646020
Copy link

2.10.10

@liuyichenchen
Copy link

脚本必须在config目录下吗?现在所有脚本都在scripts文件夹下面?

@xinchenmi
Copy link

大佬,青龙2.10.12不能用么

@fzls
Copy link
Author

fzls commented Jan 17, 2022

@xinchenmi 应该是可以的吧

@YoYulyvW
Copy link

@xinchenmi 应该是可以的吧

大佬有空弄个conc版本吗,例如传入-t5 按ck总数并发conc 1-5 6-10..这种🤣

@fzls
Copy link
Author

fzls commented Aug 16, 2022

conc 1-5 6-10..这种🤣

可以自己基于代码改一改,很简单的。
我没有这个需求,所以可见的时间内我应该不会改哈哈

@YoYulyvW
Copy link

我照着改了不过没改好🤣subprocess.Popen那里直接全部分段运行了,没有等待1-5执行完成再执行4-6

@fzls
Copy link
Author

fzls commented Aug 16, 2022

我照着改了不过没改好🤣subprocess.Popen那里直接全部分段运行了,没有等待1-5执行完成再执行4-6

你想要不并发执行的话,就用subprocess.call接口调用,这个接口是同步的,执行完才会继续下一个语句

@YoYulyvW
Copy link

call

明白了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment