Skip to content

Instantly share code, notes, and snippets.

@akira345
Last active June 10, 2025 14:02
Show Gist options
  • Save akira345/bc21d7ef511066c6b4ee6bddb504da57 to your computer and use it in GitHub Desktop.
Save akira345/bc21d7ef511066c6b4ee6bddb504da57 to your computer and use it in GitHub Desktop.
nodeJS+MySQLコードサンプル
# Mysqlコンテナを起動
services:
mysql:
image: mysql:8
container_name: mysql_container
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: test
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
volumes:
mysql_data:
driver: local
const mysql = require("mysql2/promise");
/**
* パラレルバルクINSERT処理用関数
* 渡されたコールバック関数を1000件単位でバルク処理します
* さらにそれを内部で10コネクションで分散しますので、最大1万件同時に捌きます
* @param {Array} items データオブジェクト配列
* @param {Function} mapFunction データ作成コールバック関数
* @param {Function} executeBulkInsert バルクインサート実行コールバック関数
*/
async function parallelBulkInsert(items, mapFunction, executeBulkInsert) {
const batchSize = 1000; // バルクインサートする最大値
const numParallelInserts = 10; // 同時に張るコネクション数
const pool = mysql.createPool({
uri: "mysql://root:password@localhost:3306/test",
namedPlaceholders: true,
waitForConnections: true,
connectionLimit: numParallelInserts,
queueLimit: 0,
});
console.log("MySQLコネクションプール作成OK");
// 配列のバイト数を取得する関数
const calcArrayBytes = (array) => {
const buffer = new ArrayBuffer(array.length);
return buffer.byteLength;
};
// チャンクをバルクインサートするヘルパー
async function bulkInsertChunks(chunks) {
await Promise.all(
chunks.map(async (chunk) => {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
await executeBulkInsert(connection, chunk);
await connection.commit();
} catch (err) {
await connection.rollback();
throw err;
} finally {
connection.release();
}
})
);
}
try {
const chunksArr = [];
let dataChunk = [];
for (const item of items) {
dataChunk.push(mapFunction(item));
// チャンクサイズまたはバイト数制限で分割
if (dataChunk.length >= batchSize || calcArrayBytes(chunksArr) > 66073344) {
chunksArr.push([...dataChunk]);
dataChunk = [];
// 最大10セット同時に実行
if (chunksArr.length >= numParallelInserts) {
await bulkInsertChunks(chunksArr);
chunksArr.length = 0;
}
}
}
// 残りデータの処理
if (dataChunk.length) {
chunksArr.push([...dataChunk]);
}
if (chunksArr.length) {
await bulkInsertChunks(chunksArr);
}
} catch (error) {
console.error('パラレルバルクインサート処理に失敗:', error);
throw error;
} finally {
await pool.end();
}
}
module.exports = {
parallelBulkInsert,
};
const mysql = require("mysql2/promise");
const { parallelBulkInsert } = require("./lib.js");
// Dockerを使用してMySQL DB を起動してダミーテーブルを作成し、テストデータを入れる
// docker compose up -d
// docker exec -it mysql_container mysql -uroot -ppassword test -e "CREATE TABLE IF NOT EXISTS users (id INT PRIMARY KEY, name VARCHAR(255), age INT)";
/**
* 0~100の範囲でランダムな整数(年齢)を生成する
* @returns {number} 0~100のランダムな整数
*/
function getRandomAge() {
return Math.floor(Math.random() * 101);
}
async function main() {
// MySQLデータ取得
const pool = mysql.createPool({
uri: "mysql://root:password@localhost:3306/test",
namedPlaceholders: true, // 名前付きプレースホルダを有効にする
waitForConnections: true, // コネクションプールが満杯の時、エラーにせず空きが出るまで待機する
connectionLimit: 10, // コネクションプールの最大数
queueLimit: 0, // コネクションプールが満杯のとき、待機できるリクエストの最大数。0 は無制限
});
console.log("MySQLコネクションプール作成OK");
try {
// ダミーのデータをInsertする
// トランザクションはコネクション単位でしか実行できないので、トランザクションを張るにはgetConnection()を使用してコネクションを取得し、
// beginTransaction()を呼び出す
const conn = await pool.getConnection();
try {
await conn.query("TRUNCATE TABLE users"); // テーブルを空にする
console.log("テーブルを空にしました");
// トランザクション開始
await conn.beginTransaction();
/*
// キー衝突テスト
await conn.query(
"INSERT INTO users (id,name, age) VALUES (:id,:name, :age)",
{ id: 16, name: "Charlie", age: 28 }
);
await conn.query(
"INSERT INTO users (id,name, age) VALUES (:id,:name, :age)",
{ id: 14, name: "Charlie", age: 28 }
);
*/
console.log("データ挿入完了");
await conn.commit();
} catch (e) {
// エラーが発生した場合はロールバック
await conn.rollback();
console.error("データ挿入中にエラー:", e);
} finally {
// プールへコネクションを返却
conn.release();
}
// 15秒間隔で10回データをSelectする(同期的に待つ)
/*
for (let i = 0; i < 10; i++) {
// pool.query()、pool.execute()はcreateConnection()を内部で呼び出しているので、その時点でコネクションを取得する。
// また取得したコネクションの返却は自動的に行われる。
// そのため、実行前にpool.getConnection()を呼び出す必要はないし、コネクションを明示的に返却する必要もない。
const [rows] = await pool.query("SELECT count(*) FROM users");
console.log(`データ取得(${i + 1}回目):`, rows);
if (i < 9) {
// この間に接続が切れても再接続される
await new Promise((resolve) => setTimeout(resolve, 15000));
}
}
*/
// パラレルバルクインサート用データ作成
const insertUserData = [];
for (let i = 0; i < 5000000; i++) {
const recordObj = {
id: i,
name: `Charlie${i}`,
age: getRandomAge(),
};
insertUserData.push(recordObj);
}
/**
* ユーザーデータ1件をバルクインサート用の値配列に変換する
* @param {{id: number, name: string, age: number}} row - ユーザーデータ
* @returns {Array} [id, name, age]
*/
const userDataMapRowToInsertValues = (row) => {
return [row.id, row.name, row.age];
};
/**
* バルクインサートを実行する
* @param {import("mysql2/promise").PoolConnection} conn - MySQLコネクション
* @param {Array} dataArray - バルクインサート用データ配列
* @returns {Promise<void>}
*/
const userDataExecuteBulkInsert = async (conn, dataArray) => {
const strSQL = `
insert into users (
id,
name,
age
) values :data
`; // バルクインサート時はパラメタは一つだけ(配列を1つ渡すので)
const sqlParams = { data: dataArray };
// SQL実行
await conn.query(strSQL, sqlParams);
};
// パラレルインサート実行
try {
await parallelBulkInsert(
insertUserData,
userDataMapRowToInsertValues,
userDataExecuteBulkInsert
);
console.log("パラレルインサートOK");
} catch (e) {
console.error("パラレルインサート中にエラー:", e);
}
const [rows] = await pool.query("SELECT count(*) FROM users");
console.log(`データ取得:`, rows);
} finally {
// プールを閉じる
await pool.end();
console.log("MySQLコネクションプールを閉じました");
}
}
// main
// 即時実行関数を定義
(async () => {
try {
await main();
} catch (e) {
// Deal with the fact the chain failed
console.log("エラー発生!");
console.log(e);
}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment