Skip to content

Instantly share code, notes, and snippets.

@naosim
Last active August 31, 2025 03:16
Show Gist options
  • Save naosim/301bc1bc91c4093ab11d22f1f7bbcaea to your computer and use it in GitHub Desktop.
Save naosim/301bc1bc91c4093ab11d22f1f7bbcaea to your computer and use it in GitHub Desktop.
MyQueue
// yocto-queue からの引用
// https://github.com/sindresorhus/yocto-queue
/*
How it works:
`this.#head` is an instance of `Node` which keeps track of its current value and nests another instance of `Node` that keeps the value that comes after it. When a value is provided to `.enqueue()`, the code needs to iterate through `this.#head`, going deeper and deeper to find the last value. However, iterating through every single item is slow. This problem is solved by saving a reference to the last value as `this.#tail` so that it can reference it to add a new value.
*/
class Node {
value;
next;
constructor(value) {
this.value = value;
}
}
export class Queue {
#head;
#tail;
#size;
constructor() {
this.clear();
}
enqueue(value) {
const node = new Node(value);
if (this.#head) {
this.#tail.next = node;
this.#tail = node;
} else {
this.#head = node;
this.#tail = node;
}
this.#size++;
}
dequeue() {
const current = this.#head;
if (!current) {
return;
}
this.#head = this.#head.next;
this.#size--;
return current.value;
}
peek() {
if (!this.#head) {
return;
}
return this.#head.value;
// TODO: Node.js 18.
// return this.#head?.value;
}
clear() {
this.#head = undefined;
this.#tail = undefined;
this.#size = 0;
}
get size() {
return this.#size;
}
*[Symbol.iterator]() {
let current = this.#head;
while (current) {
yield current.value;
current = current.next;
}
}
* drain() {
while (this.#head) {
yield this.dequeue();
}
}
}
// yocto-queue ここまで --------------------------------
/** @typedef QueueTask
* @property {string} value enqueue時に指定した値
* @property {'new'|'ok'|'error'|'retry'} status 最終処理状態
* @property {Date} [date] 最終処理日時
* @property {Array<{status:'ok'|'error'|'retry', date:Date}>} results 処理結果の履歴
*/
export class QueueTaskExecutor {
queue = new Queue();
count = 0;
get size() {
return this.queue.size;
}
enqueue(value) {
this.queue.enqueue({ value, status: 'new', results: [] });
}
/**
*
* @param {(value:any, task:QueueTask)=>{status:'new'|'ok'|'error'|'retry'}} process
* @param {*} options
* @returns
*/
async execute(process, options = {}) {
if (!options.getInterval) {
options.getInterval = (task) => 1000;
}
if (!options.result) {
options.result = (task, result) => { };
}
if (!options.isWait) {
options.isWait = (task) => false;
}
const task = this.queue.peek();
if (!task) {// queueにタスクがなくなったら終了
return;
}
setTimeout(async () => {
if (options.isWait(task)) {
this.execute(process, options);
return;
}
this.queue.dequeue(); // キューから取り出す
const date = new Date();
this.count++;
const result = await process(task.value, task);
result.count = this.count;
result.date = date;
task.status = result.status;
task.date = result.date;
task.results.push(result);
options.result(task, result);
if (result.status === 'retry') {
this.queue.enqueue(task); // 再度キューに戻す
}
this.execute(process, options);
}, options.getInterval(task));
}
}
QueueTaskExecutor.result = {
ok: {status: 'ok'},
error: {status: 'error'},
retry: {status: 'retry'},
};
// サンプル実行 ------------------------------
const text = `
A
B
errortask
C
test
`.trim();
async function sample() {
var myqueue = new QueueTaskExecutor();
text.split('\n').forEach(line => myqueue.enqueue(line.trim()));
await myqueue.execute(async (value, task) => {
console.log(`process: ${value}`);
if (value === 'errortask') {
if (task.results.length < 2) {
return QueueTaskExecutor.result.retry;
}
return QueueTaskExecutor.result.error;
}
return QueueTaskExecutor.result.ok;
}, {
isWait: (task) => {
return false;
// const now = new Date();
// return now.getMinutes() < 12;
},
getInterval: (task) => {
if (task.status === 'retry') {
return 5 * 1000; // retryの場合は5秒後に再実行
}
return 1 * 1000; // 通常は1秒後に実行
},
result: (task, result) => {
const tasks = Array.from(myqueue.queue);
const retryTasks = tasks.filter(v => v.status === 'retry');
const log = {task, progress: {count: result.count, date: task.date, remain: tasks.length, retry: retryTasks.length, task}};
console.log(JSON.stringify(log));
}
});
}
// if(true) {
// // if(globalThis.window) {
// await sample();
// }
import { QueueTaskExecutor } from './QueueTaskExecutor.mjs';
import fs from 'node:fs';
import {execSync} from 'node:child_process';
// サンプル実行 ------------------------------
const inputFile = './input.txt';
const outputFile = './output.log';
const isWait = (task) => {
return false;
// const now = new Date();
// return now.getMinutes() < 12;
}
const getInterval = (task) => {
if (task.status === 'retry') {
return 5 * 1000; // retryの場合は5秒後に再実行
}
return 1 * 1000; // 通常は1秒後に実行
}
const text = fs.readFileSync('./input.txt', 'utf8');
var myqueue = new QueueTaskExecutor();
text.split('\n').forEach(line => myqueue.enqueue(line.trim()));
await myqueue.execute(async (value, task) => {
console.log(`process: ${value}`);
const res = execSync(value).toString().trim(); // shellの実行
console.log(res);
if (value.indexOf('errortask') != -1) {
if (task.results.length < 2) {
return {...QueueTaskExecutor.result.retry, res};
}
return {...QueueTaskExecutor.result.error, res};
}
return {...QueueTaskExecutor.result.ok, res};
}, {
isWait,
getInterval,
result: (task, result) => {
const tasks = Array.from(myqueue.queue);
const retryTasks = tasks.filter(v => v.status === 'retry');
const log = {task, progress: {count: result.count, date: task.date, remain: tasks.length, retry: retryTasks.length, task}};
console.log(JSON.stringify(log));
fs.appendFileSync(outputFile, JSON.stringify(log) + '\n' );
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment