Last active
June 10, 2025 14:02
-
-
Save akira345/bc21d7ef511066c6b4ee6bddb504da57 to your computer and use it in GitHub Desktop.
nodeJS+MySQLコードサンプル
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Mysqlコンテナを起動 | |
| services: | |
| mysql: | |
| image: mysql:8 | |
| container_name: mysql_container | |
| environment: | |
| MYSQL_ROOT_PASSWORD: password | |
| MYSQL_DATABASE: test | |
| ports: | |
| - "3306:3306" | |
| volumes: | |
| - mysql_data:/var/lib/mysql | |
| volumes: | |
| mysql_data: | |
| driver: local |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const mysql = require("mysql2/promise"); | |
| /** | |
| * パラレルバルクINSERT処理用関数 | |
| * 渡されたコールバック関数を1000件単位でバルク処理します | |
| * さらにそれを内部で10コネクションで分散しますので、最大1万件同時に捌きます | |
| * @param {Array} items データオブジェクト配列 | |
| * @param {Function} mapFunction データ作成コールバック関数 | |
| * @param {Function} executeBulkInsert バルクインサート実行コールバック関数 | |
| */ | |
| async function parallelBulkInsert(items, mapFunction, executeBulkInsert) { | |
| const batchSize = 1000; // バルクインサートする最大値 | |
| const numParallelInserts = 10; // 同時に張るコネクション数 | |
| const pool = mysql.createPool({ | |
| uri: "mysql://root:password@localhost:3306/test", | |
| namedPlaceholders: true, | |
| waitForConnections: true, | |
| connectionLimit: numParallelInserts, | |
| queueLimit: 0, | |
| }); | |
| console.log("MySQLコネクションプール作成OK"); | |
| // 配列のバイト数を取得する関数 | |
| const calcArrayBytes = (array) => { | |
| const buffer = new ArrayBuffer(array.length); | |
| return buffer.byteLength; | |
| }; | |
| // チャンクをバルクインサートするヘルパー | |
| async function bulkInsertChunks(chunks) { | |
| await Promise.all( | |
| chunks.map(async (chunk) => { | |
| const connection = await pool.getConnection(); | |
| try { | |
| await connection.beginTransaction(); | |
| await executeBulkInsert(connection, chunk); | |
| await connection.commit(); | |
| } catch (err) { | |
| await connection.rollback(); | |
| throw err; | |
| } finally { | |
| connection.release(); | |
| } | |
| }) | |
| ); | |
| } | |
| try { | |
| const chunksArr = []; | |
| let dataChunk = []; | |
| for (const item of items) { | |
| dataChunk.push(mapFunction(item)); | |
| // チャンクサイズまたはバイト数制限で分割 | |
| if (dataChunk.length >= batchSize || calcArrayBytes(chunksArr) > 66073344) { | |
| chunksArr.push([...dataChunk]); | |
| dataChunk = []; | |
| // 最大10セット同時に実行 | |
| if (chunksArr.length >= numParallelInserts) { | |
| await bulkInsertChunks(chunksArr); | |
| chunksArr.length = 0; | |
| } | |
| } | |
| } | |
| // 残りデータの処理 | |
| if (dataChunk.length) { | |
| chunksArr.push([...dataChunk]); | |
| } | |
| if (chunksArr.length) { | |
| await bulkInsertChunks(chunksArr); | |
| } | |
| } catch (error) { | |
| console.error('パラレルバルクインサート処理に失敗:', error); | |
| throw error; | |
| } finally { | |
| await pool.end(); | |
| } | |
| } | |
| module.exports = { | |
| parallelBulkInsert, | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const mysql = require("mysql2/promise"); | |
| const { parallelBulkInsert } = require("./lib.js"); | |
| // Dockerを使用してMySQL DB を起動してダミーテーブルを作成し、テストデータを入れる | |
| // docker compose up -d | |
| // docker exec -it mysql_container mysql -uroot -ppassword test -e "CREATE TABLE IF NOT EXISTS users (id INT PRIMARY KEY, name VARCHAR(255), age INT)"; | |
| /** | |
| * 0~100の範囲でランダムな整数(年齢)を生成する | |
| * @returns {number} 0~100のランダムな整数 | |
| */ | |
| function getRandomAge() { | |
| return Math.floor(Math.random() * 101); | |
| } | |
| async function main() { | |
| // MySQLデータ取得 | |
| const pool = mysql.createPool({ | |
| uri: "mysql://root:password@localhost:3306/test", | |
| namedPlaceholders: true, // 名前付きプレースホルダを有効にする | |
| waitForConnections: true, // コネクションプールが満杯の時、エラーにせず空きが出るまで待機する | |
| connectionLimit: 10, // コネクションプールの最大数 | |
| queueLimit: 0, // コネクションプールが満杯のとき、待機できるリクエストの最大数。0 は無制限 | |
| }); | |
| console.log("MySQLコネクションプール作成OK"); | |
| try { | |
| // ダミーのデータをInsertする | |
| // トランザクションはコネクション単位でしか実行できないので、トランザクションを張るにはgetConnection()を使用してコネクションを取得し、 | |
| // beginTransaction()を呼び出す | |
| const conn = await pool.getConnection(); | |
| try { | |
| await conn.query("TRUNCATE TABLE users"); // テーブルを空にする | |
| console.log("テーブルを空にしました"); | |
| // トランザクション開始 | |
| await conn.beginTransaction(); | |
| /* | |
| // キー衝突テスト | |
| await conn.query( | |
| "INSERT INTO users (id,name, age) VALUES (:id,:name, :age)", | |
| { id: 16, name: "Charlie", age: 28 } | |
| ); | |
| await conn.query( | |
| "INSERT INTO users (id,name, age) VALUES (:id,:name, :age)", | |
| { id: 14, name: "Charlie", age: 28 } | |
| ); | |
| */ | |
| console.log("データ挿入完了"); | |
| await conn.commit(); | |
| } catch (e) { | |
| // エラーが発生した場合はロールバック | |
| await conn.rollback(); | |
| console.error("データ挿入中にエラー:", e); | |
| } finally { | |
| // プールへコネクションを返却 | |
| conn.release(); | |
| } | |
| // 15秒間隔で10回データをSelectする(同期的に待つ) | |
| /* | |
| for (let i = 0; i < 10; i++) { | |
| // pool.query()、pool.execute()はcreateConnection()を内部で呼び出しているので、その時点でコネクションを取得する。 | |
| // また取得したコネクションの返却は自動的に行われる。 | |
| // そのため、実行前にpool.getConnection()を呼び出す必要はないし、コネクションを明示的に返却する必要もない。 | |
| const [rows] = await pool.query("SELECT count(*) FROM users"); | |
| console.log(`データ取得(${i + 1}回目):`, rows); | |
| if (i < 9) { | |
| // この間に接続が切れても再接続される | |
| await new Promise((resolve) => setTimeout(resolve, 15000)); | |
| } | |
| } | |
| */ | |
| // パラレルバルクインサート用データ作成 | |
| const insertUserData = []; | |
| for (let i = 0; i < 5000000; i++) { | |
| const recordObj = { | |
| id: i, | |
| name: `Charlie${i}`, | |
| age: getRandomAge(), | |
| }; | |
| insertUserData.push(recordObj); | |
| } | |
| /** | |
| * ユーザーデータ1件をバルクインサート用の値配列に変換する | |
| * @param {{id: number, name: string, age: number}} row - ユーザーデータ | |
| * @returns {Array} [id, name, age] | |
| */ | |
| const userDataMapRowToInsertValues = (row) => { | |
| return [row.id, row.name, row.age]; | |
| }; | |
| /** | |
| * バルクインサートを実行する | |
| * @param {import("mysql2/promise").PoolConnection} conn - MySQLコネクション | |
| * @param {Array} dataArray - バルクインサート用データ配列 | |
| * @returns {Promise<void>} | |
| */ | |
| const userDataExecuteBulkInsert = async (conn, dataArray) => { | |
| const strSQL = ` | |
| insert into users ( | |
| id, | |
| name, | |
| age | |
| ) values :data | |
| `; // バルクインサート時はパラメタは一つだけ(配列を1つ渡すので) | |
| const sqlParams = { data: dataArray }; | |
| // SQL実行 | |
| await conn.query(strSQL, sqlParams); | |
| }; | |
| // パラレルインサート実行 | |
| try { | |
| await parallelBulkInsert( | |
| insertUserData, | |
| userDataMapRowToInsertValues, | |
| userDataExecuteBulkInsert | |
| ); | |
| console.log("パラレルインサートOK"); | |
| } catch (e) { | |
| console.error("パラレルインサート中にエラー:", e); | |
| } | |
| const [rows] = await pool.query("SELECT count(*) FROM users"); | |
| console.log(`データ取得:`, rows); | |
| } finally { | |
| // プールを閉じる | |
| await pool.end(); | |
| console.log("MySQLコネクションプールを閉じました"); | |
| } | |
| } | |
| // main | |
| // 即時実行関数を定義 | |
| (async () => { | |
| try { | |
| await main(); | |
| } catch (e) { | |
| // Deal with the fact the chain failed | |
| console.log("エラー発生!"); | |
| console.log(e); | |
| } | |
| })(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment