Last active
August 31, 2025 03:16
-
-
Save naosim/301bc1bc91c4093ab11d22f1f7bbcaea to your computer and use it in GitHub Desktop.
MyQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // yocto-queue からの引用 | |
| // https://github.com/sindresorhus/yocto-queue | |
| /* | |
| How it works: | |
| `this.#head` is an instance of `Node` which keeps track of its current value and nests another instance of `Node` that keeps the value that comes after it. When a value is provided to `.enqueue()`, the code needs to iterate through `this.#head`, going deeper and deeper to find the last value. However, iterating through every single item is slow. This problem is solved by saving a reference to the last value as `this.#tail` so that it can reference it to add a new value. | |
| */ | |
| class Node { | |
| value; | |
| next; | |
| constructor(value) { | |
| this.value = value; | |
| } | |
| } | |
| export class Queue { | |
| #head; | |
| #tail; | |
| #size; | |
| constructor() { | |
| this.clear(); | |
| } | |
| enqueue(value) { | |
| const node = new Node(value); | |
| if (this.#head) { | |
| this.#tail.next = node; | |
| this.#tail = node; | |
| } else { | |
| this.#head = node; | |
| this.#tail = node; | |
| } | |
| this.#size++; | |
| } | |
| dequeue() { | |
| const current = this.#head; | |
| if (!current) { | |
| return; | |
| } | |
| this.#head = this.#head.next; | |
| this.#size--; | |
| return current.value; | |
| } | |
| peek() { | |
| if (!this.#head) { | |
| return; | |
| } | |
| return this.#head.value; | |
| // TODO: Node.js 18. | |
| // return this.#head?.value; | |
| } | |
| clear() { | |
| this.#head = undefined; | |
| this.#tail = undefined; | |
| this.#size = 0; | |
| } | |
| get size() { | |
| return this.#size; | |
| } | |
| *[Symbol.iterator]() { | |
| let current = this.#head; | |
| while (current) { | |
| yield current.value; | |
| current = current.next; | |
| } | |
| } | |
| * drain() { | |
| while (this.#head) { | |
| yield this.dequeue(); | |
| } | |
| } | |
| } | |
| // yocto-queue ここまで -------------------------------- | |
| /** @typedef QueueTask | |
| * @property {string} value enqueue時に指定した値 | |
| * @property {'new'|'ok'|'error'|'retry'} status 最終処理状態 | |
| * @property {Date} [date] 最終処理日時 | |
| * @property {Array<{status:'ok'|'error'|'retry', date:Date}>} results 処理結果の履歴 | |
| */ | |
| export class QueueTaskExecutor { | |
| queue = new Queue(); | |
| count = 0; | |
| get size() { | |
| return this.queue.size; | |
| } | |
| enqueue(value) { | |
| this.queue.enqueue({ value, status: 'new', results: [] }); | |
| } | |
| /** | |
| * | |
| * @param {(value:any, task:QueueTask)=>{status:'new'|'ok'|'error'|'retry'}} process | |
| * @param {*} options | |
| * @returns | |
| */ | |
| async execute(process, options = {}) { | |
| if (!options.getInterval) { | |
| options.getInterval = (task) => 1000; | |
| } | |
| if (!options.result) { | |
| options.result = (task, result) => { }; | |
| } | |
| if (!options.isWait) { | |
| options.isWait = (task) => false; | |
| } | |
| const task = this.queue.peek(); | |
| if (!task) {// queueにタスクがなくなったら終了 | |
| return; | |
| } | |
| setTimeout(async () => { | |
| if (options.isWait(task)) { | |
| this.execute(process, options); | |
| return; | |
| } | |
| this.queue.dequeue(); // キューから取り出す | |
| const date = new Date(); | |
| this.count++; | |
| const result = await process(task.value, task); | |
| result.count = this.count; | |
| result.date = date; | |
| task.status = result.status; | |
| task.date = result.date; | |
| task.results.push(result); | |
| options.result(task, result); | |
| if (result.status === 'retry') { | |
| this.queue.enqueue(task); // 再度キューに戻す | |
| } | |
| this.execute(process, options); | |
| }, options.getInterval(task)); | |
| } | |
| } | |
| QueueTaskExecutor.result = { | |
| ok: {status: 'ok'}, | |
| error: {status: 'error'}, | |
| retry: {status: 'retry'}, | |
| }; | |
| // サンプル実行 ------------------------------ | |
| const text = ` | |
| A | |
| B | |
| errortask | |
| C | |
| test | |
| `.trim(); | |
| async function sample() { | |
| var myqueue = new QueueTaskExecutor(); | |
| text.split('\n').forEach(line => myqueue.enqueue(line.trim())); | |
| await myqueue.execute(async (value, task) => { | |
| console.log(`process: ${value}`); | |
| if (value === 'errortask') { | |
| if (task.results.length < 2) { | |
| return QueueTaskExecutor.result.retry; | |
| } | |
| return QueueTaskExecutor.result.error; | |
| } | |
| return QueueTaskExecutor.result.ok; | |
| }, { | |
| isWait: (task) => { | |
| return false; | |
| // const now = new Date(); | |
| // return now.getMinutes() < 12; | |
| }, | |
| getInterval: (task) => { | |
| if (task.status === 'retry') { | |
| return 5 * 1000; // retryの場合は5秒後に再実行 | |
| } | |
| return 1 * 1000; // 通常は1秒後に実行 | |
| }, | |
| result: (task, result) => { | |
| const tasks = Array.from(myqueue.queue); | |
| const retryTasks = tasks.filter(v => v.status === 'retry'); | |
| const log = {task, progress: {count: result.count, date: task.date, remain: tasks.length, retry: retryTasks.length, task}}; | |
| console.log(JSON.stringify(log)); | |
| } | |
| }); | |
| } | |
| // if(true) { | |
| // // if(globalThis.window) { | |
| // await sample(); | |
| // } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { QueueTaskExecutor } from './QueueTaskExecutor.mjs'; | |
| import fs from 'node:fs'; | |
| import {execSync} from 'node:child_process'; | |
| // サンプル実行 ------------------------------ | |
| const inputFile = './input.txt'; | |
| const outputFile = './output.log'; | |
| const isWait = (task) => { | |
| return false; | |
| // const now = new Date(); | |
| // return now.getMinutes() < 12; | |
| } | |
| const getInterval = (task) => { | |
| if (task.status === 'retry') { | |
| return 5 * 1000; // retryの場合は5秒後に再実行 | |
| } | |
| return 1 * 1000; // 通常は1秒後に実行 | |
| } | |
| const text = fs.readFileSync('./input.txt', 'utf8'); | |
| var myqueue = new QueueTaskExecutor(); | |
| text.split('\n').forEach(line => myqueue.enqueue(line.trim())); | |
| await myqueue.execute(async (value, task) => { | |
| console.log(`process: ${value}`); | |
| const res = execSync(value).toString().trim(); // shellの実行 | |
| console.log(res); | |
| if (value.indexOf('errortask') != -1) { | |
| if (task.results.length < 2) { | |
| return {...QueueTaskExecutor.result.retry, res}; | |
| } | |
| return {...QueueTaskExecutor.result.error, res}; | |
| } | |
| return {...QueueTaskExecutor.result.ok, res}; | |
| }, { | |
| isWait, | |
| getInterval, | |
| result: (task, result) => { | |
| const tasks = Array.from(myqueue.queue); | |
| const retryTasks = tasks.filter(v => v.status === 'retry'); | |
| const log = {task, progress: {count: result.count, date: task.date, remain: tasks.length, retry: retryTasks.length, task}}; | |
| console.log(JSON.stringify(log)); | |
| fs.appendFileSync(outputFile, JSON.stringify(log) + '\n' ); | |
| } | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment