Created
September 4, 2025 13:53
-
-
Save Sdy603/12d38470ea6c2f0c71d0dfca38044265 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * fetch_work_item_parents.js | |
| * | |
| * Backfill: WorkItemLinks WIQL, partitioned by Source.ChangedDate windows using DATE precision (YYYY-MM-DD) | |
| * Incremental: WorkItems WIQL by ChangedDate, then workitemsbatch expand=Relations | |
| * Writes: batched multi row upserts into custom.ado_work_item_links | |
| * Schema columns used: child_work_item_source_id, parent_work_item_source_id, relation_url | |
| * MIN_ID applied to both parent and child | |
| * ADO HTTPS via http://DX_PROXY_USER:[email protected]:80 | |
| * Dry run writes a SQL file | |
| */ | |
| const { Client } = require("pg"); | |
| const fs = require("fs"); | |
| const path = require("path"); | |
| const axios = require("axios"); | |
| const HttpsProxyAgent = require("https-proxy-agent"); | |
| // Required env | |
| const { DX_PG_URL, ADO_PAT, DX_PROXY_USER, DX_PROXY_PASS } = process.env; | |
| if (!DX_PG_URL || !ADO_PAT) { | |
| console.error("Set DX_PG_URL and ADO_PAT"); | |
| process.exit(1); | |
| } | |
| if (!DX_PROXY_USER || !DX_PROXY_PASS) { | |
| console.error("Set DX_PROXY_USER and DX_PROXY_PASS"); | |
| process.exit(1); | |
| } | |
| // Tunables | |
| const MODE = (process.env.MODE || "incremental").toLowerCase(); | |
| const DRY_RUN = process.env.DRY_RUN === "1"; | |
| const OUT_FILE = process.env.OUT_FILE || "ado_links_inserts.sql"; | |
| const BATCH_SIZE = parseInt(process.env.BATCH_SIZE || "200", 10); // fetch chunk size for workitemsbatch (incremental) | |
| const FIRST_RUN_LOOKBACK_DAYS = parseInt(process.env.FIRST_RUN_LOOKBACK_DAYS || "3650", 10); | |
| const MIN_ID = parseInt(process.env.MIN_ID || "11339", 10); | |
| // Write batching | |
| const WRITE_BUFFER_SIZE = parseInt(process.env.WRITE_BUFFER_SIZE || "200", 10); // rows per upsert statement | |
| const WRITE_FLUSH_MS = parseInt(process.env.WRITE_FLUSH_MS || "250", 10); // timer based flush | |
| // Pacing | |
| const SLEEP_PER_2000_LINKS_MS = parseInt(process.env.SLEEP_PER_2000_LINKS_MS || "400", 10); | |
| const SLEEP_BETWEEN_BATCHES_MS = parseInt(process.env.SLEEP_BETWEEN_BATCHES_MS || "300", 10); | |
| const SLEEP_BETWEEN_PROJECTS_MS = parseInt(process.env.SLEEP_BETWEEN_PROJECTS_MS || "500", 10); | |
| // Normalize DX connection string | |
| const normalizePostgresURL = (url) => | |
| url.startsWith("postgres://") ? url.replace("postgres://", "postgresql://") : url; | |
| // Proxy agent for ADO HTTPS calls | |
| const proxyUrl = `http://${encodeURIComponent(DX_PROXY_USER)}:${encodeURIComponent(DX_PROXY_PASS)}@proxy.getdx.net:80`; | |
| const httpsAgent = new HttpsProxyAgent(proxyUrl); | |
| // Axios wrapper with retry and WIQL cap handling | |
| let httpCalls = 0; | |
| async function httpJson(url, { method = "GET", headers = {}, body = undefined }, attempt = 1) { | |
| httpCalls += 1; | |
| try { | |
| const res = await axios({ | |
| url, | |
| method, | |
| headers, | |
| data: body, | |
| httpsAgent, | |
| proxy: false, | |
| timeout: 30000, | |
| validateStatus: () => true | |
| }); | |
| if (res.status >= 200 && res.status < 300) return res.data; | |
| if (res.status === 400 && /WorkItemTrackingQueryResultSizeLimitExceeded/i.test(JSON.stringify(res.data || {}))) { | |
| const e = new Error("WIQL_LINKS_RESULT_CAP"); | |
| e.code = "WIQL_LINKS_RESULT_CAP"; | |
| throw e; | |
| } | |
| if ((res.status === 429 || (res.status >= 500 && res.status < 600)) && attempt < 6) { | |
| const ra = parseInt(res.headers["retry-after"] || "0", 10); | |
| const ms = ra > 0 ? ra * 1000 : attempt * 1000; | |
| await new Promise(r => setTimeout(r, ms)); | |
| return httpJson(url, { method, headers, body }, attempt + 1); | |
| } | |
| throw new Error(`HTTP ${res.status}: ${res.statusText} | ${typeof res.data === "string" ? res.data : JSON.stringify(res.data)}`); | |
| } catch (err) { | |
| if (err && err.code === "WIQL_LINKS_RESULT_CAP") throw err; | |
| if (attempt < 6) { | |
| const ms = attempt * 1000; | |
| await new Promise(r => setTimeout(r, ms)); | |
| return httpJson(url, { method, headers, body }, attempt + 1); | |
| } | |
| throw err; | |
| } | |
| } | |
| const AUTH_HEADER = { Authorization: "Basic " + Buffer.from(":" + ADO_PAT).toString("base64") }; | |
| // Projects query | |
| const ONE_READ_SQL = ` | |
| SELECT | |
| ao.name AS organization_name, | |
| ap.name AS project_name, | |
| MAX(l.row_created_at) AS last_ingest | |
| FROM ado_projects ap | |
| JOIN ado_organizations ao | |
| ON ap.organization_id::bigint = ao.id | |
| LEFT JOIN custom.ado_work_item_links l | |
| ON true | |
| WHERE ap.api_accessible = TRUE | |
| AND ap.allowlisted = TRUE | |
| AND ao.api_accessible = TRUE | |
| GROUP BY ao.name, ap.name | |
| ORDER BY ao.name, ap.name; | |
| `; | |
| // Helpers | |
| function escapeSql(s) { return s.replace(/'/g, "''"); } | |
| function parentIdFromUrl(url) { const n = parseInt(url.split("/").pop(), 10); return Number.isFinite(n) ? n : null; } | |
| function chunk(arr, n) { const out = []; for (let i = 0; i < arr.length; i += n) out.push(arr.slice(i, i + n)); return out; } | |
| async function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } | |
| function toWiqlDate(d) { | |
| const y = d.getUTCFullYear(); | |
| const m = String(d.getUTCMonth() + 1).padStart(2, "0"); | |
| const day = String(d.getUTCDate()).padStart(2, "0"); | |
| return `${y}-${m}-${day}`; | |
| } | |
| // Build one row insert for dry run output file | |
| function insertSql(childId, parentId, relUrl) { | |
| const urlSql = relUrl ? `'${escapeSql(relUrl)}'` : "NULL"; | |
| return ( | |
| "INSERT INTO custom.ado_work_item_links " + | |
| "(child_work_item_source_id, parent_work_item_source_id, relation_url) VALUES " + | |
| `(${childId}, ${parentId}, ${urlSql}) ` + | |
| "ON CONFLICT (child_work_item_source_id, parent_work_item_source_id) " + | |
| "DO UPDATE SET relation_url = EXCLUDED.relation_url;" | |
| ); | |
| } | |
| // Buffered writer for live mode with multi row upserts | |
| function buildWriter(DRY_RUN, client, sqlLines) { | |
| if (DRY_RUN) { | |
| const writer = async (childId, parentId, relUrl) => { | |
| sqlLines.push(insertSql(childId, parentId, relUrl)); | |
| }; | |
| writer.flush = async () => {}; | |
| return writer; | |
| } | |
| const buf = []; | |
| let flushTimer = null; | |
| async function flush() { | |
| if (buf.length === 0) return; | |
| const rows = buf.splice(0, buf.length); | |
| const values = []; | |
| const params = []; | |
| let p = 1; | |
| for (const r of rows) { | |
| values.push(`($${p++}, $${p++}, $${p++})`); | |
| params.push(r.childId, r.parentId, r.relUrl); | |
| } | |
| const sql = ` | |
| INSERT INTO custom.ado_work_item_links | |
| (child_work_item_source_id, parent_work_item_source_id, relation_url) | |
| VALUES ${values.join(",")} | |
| ON CONFLICT (child_work_item_source_id, parent_work_item_source_id) | |
| DO UPDATE SET relation_url = EXCLUDED.relation_url | |
| `; | |
| await client.query(sql, params); | |
| await sleep(100); // small pause to smooth bursts | |
| } | |
| async function scheduleFlush() { | |
| if (flushTimer) return; | |
| flushTimer = setTimeout(async () => { | |
| flushTimer = null; | |
| try { await flush(); } catch (e) { console.error(e.message || String(e)); } | |
| }, WRITE_FLUSH_MS); | |
| } | |
| const writer = async (childId, parentId, relUrl) => { | |
| buf.push({ childId, parentId, relUrl }); | |
| if (buf.length >= WRITE_BUFFER_SIZE) { | |
| await flush(); | |
| } else { | |
| await scheduleFlush(); | |
| } | |
| }; | |
| writer.flush = flush; | |
| return writer; | |
| } | |
| /** | |
| * Backfill using WorkItemLinks with date only windows. | |
| * Returns { total, kept, dropped } for metrics. | |
| */ | |
| async function backfillProject(org, project, writeFn) { | |
| const base = `https://dev.azure.com/${encodeURIComponent(org)}`; | |
| async function runLinksWiql(start, end) { | |
| const wiql = ` | |
| SELECT [System.Id] | |
| FROM WorkItemLinks | |
| WHERE | |
| [Source].[System.TeamProject] = '${escapeSql(project)}' | |
| AND [Source].[System.ChangedDate] >= '${toWiqlDate(start)}' | |
| AND [Source].[System.ChangedDate] < '${toWiqlDate(end)}' | |
| AND [System.Links.LinkType] = 'System.LinkTypes.Hierarchy-Forward' | |
| MODE(Recursive) | |
| `; | |
| const url = `${base}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`; | |
| const data = await httpJson(url, { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) }); | |
| return data.workItemRelations || []; | |
| } | |
| let total = 0, kept = 0, dropped = 0; | |
| const earliest = new Date("2005-01-01T00:00:00Z"); | |
| const now = new Date(); | |
| for (let d = new Date(earliest); d < now; ) { | |
| const end = new Date(d); | |
| end.setUTCDate(end.getUTCDate() + 180); | |
| if (end > now) end.setTime(now.getTime()); | |
| let rels; | |
| try { | |
| rels = await runLinksWiql(d, end); | |
| } catch (e) { | |
| if (e && e.code === "WIQL_LINKS_RESULT_CAP") { | |
| const mid = new Date((d.getTime() + end.getTime()) / 2); | |
| const a = await backfillProjectWindow(org, project, writeFn, d, mid); | |
| const b = await backfillProjectWindow(org, project, writeFn, mid, end); | |
| total += a.total + b.total; | |
| kept += a.kept + b.kept; | |
| dropped += a.dropped + b.dropped; | |
| d = end; | |
| continue; | |
| } | |
| throw e; | |
| } | |
| for (const r of rels) { | |
| if (!r.source || !r.target) continue; | |
| total++; | |
| const parentId = r.source.id; | |
| const childId = r.target.id; | |
| if (!Number.isFinite(parentId) || !Number.isFinite(childId)) { dropped++; continue; } | |
| if (childId < MIN_ID || parentId < MIN_ID) { dropped++; continue; } | |
| const relUrl = `${base}/_apis/wit/workItems/${parentId}`; | |
| await writeFn(childId, parentId, relUrl); | |
| kept++; | |
| if (kept % 2000 === 0) await sleep(SLEEP_PER_2000_LINKS_MS); | |
| } | |
| d = end; | |
| } | |
| return { total, kept, dropped }; | |
| // helper for split windows | |
| async function backfillProjectWindow(org, project, writeFn, start, end) { | |
| const base2 = `https://dev.azure.com/${encodeURIComponent(org)}`; | |
| const url = `${base2}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`; | |
| const wiql = ` | |
| SELECT [System.Id] | |
| FROM WorkItemLinks | |
| WHERE | |
| [Source].[System.TeamProject] = '${escapeSql(project)}' | |
| AND [Source].[System.ChangedDate] >= '${toWiqlDate(start)}' | |
| AND [Source].[System.ChangedDate] < '${toWiqlDate(end)}' | |
| AND [System.Links.LinkType] = 'System.LinkTypes.Hierarchy-Forward' | |
| MODE(Recursive) | |
| `; | |
| try { | |
| const data = await httpJson(url, { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) }); | |
| const rels = data.workItemRelations || []; | |
| let t = 0, k = 0, dr = 0; | |
| for (const r of rels) { | |
| if (!r.source || !r.target) continue; | |
| t++; | |
| const parentId = r.source.id; | |
| const childId = r.target.id; | |
| if (!Number.isFinite(parentId) || !Number.isFinite(childId)) { dr++; continue; } | |
| if (childId < MIN_ID || parentId < MIN_ID) { dr++; continue; } | |
| const relUrl = `${base2}/_apis/wit/workItems/${parentId}`; | |
| await writeFn(childId, parentId, relUrl); | |
| k++; | |
| if (k % 2000 === 0) await sleep(SLEEP_PER_2000_LINKS_MS); | |
| } | |
| return { total: t, kept: k, dropped: dr }; | |
| } catch (e) { | |
| if (e && e.code === "WIQL_LINKS_RESULT_CAP") { | |
| const mid = new Date((start.getTime() + end.getTime()) / 2); | |
| const left = await backfillProjectWindow(org, project, writeFn, start, mid); | |
| const right = await backfillProjectWindow(org, project, writeFn, mid, end); | |
| return { total: left.total + right.total, kept: left.kept + right.kept, dropped: left.dropped + right.dropped }; | |
| } | |
| throw e; | |
| } | |
| } | |
| } | |
| // Incremental using ChangedDate WIQL + workitemsbatch expand=Relations | |
| async function incrementalProject(org, project, sinceISO, writeFn) { | |
| const base = `https://dev.azure.com/${encodeURIComponent(org)}`; | |
| const wiql = ` | |
| SELECT [System.Id] | |
| FROM WorkItems | |
| WHERE [System.TeamProject] = '${escapeSql(project)}' | |
| AND [System.ChangedDate] > '${sinceISO}' | |
| ORDER BY [System.ChangedDate] ASC | |
| `; | |
| const idsRes = await httpJson( | |
| `${base}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`, | |
| { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) } | |
| ); | |
| const ids = (idsRes.workItems || []).map(w => w.id).filter(Number.isFinite); | |
| if (ids.length === 0) return { scanned: 0, links: 0 }; | |
| let links = 0; | |
| for (const slice of chunk(ids, BATCH_SIZE)) { | |
| const batch = await httpJson( | |
| `${base}/_apis/wit/workitemsbatch?api-version=7.1`, | |
| { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ ids: slice, fields: ["System.Id"], expand: "Relations" }) } | |
| ); | |
| for (const wi of batch.value || []) { | |
| const childId = wi.id; | |
| for (const rel of wi.relations || []) { | |
| if (rel.rel !== "System.LinkTypes.Hierarchy-Reverse" || !rel.url) continue; | |
| const parentId = parentIdFromUrl(rel.url); | |
| if (!parentId) continue; | |
| if (childId < MIN_ID || parentId < MIN_ID) continue; | |
| const relUrl = `${base}/_apis/wit/workItems/${parentId}`; | |
| await writeFn(childId, parentId, relUrl); | |
| links += 1; | |
| } | |
| } | |
| await sleep(SLEEP_BETWEEN_BATCHES_MS); | |
| } | |
| return { scanned: ids.length, links }; | |
| } | |
| async function main() { | |
| const started = new Date(); | |
| const client = new Client({ connectionString: normalizePostgresURL(DX_PG_URL), ssl: { rejectUnauthorized: false } }); | |
| await client.connect(); | |
| const { rows: scopes } = await client.query(ONE_READ_SQL); | |
| if (!scopes.length) { | |
| console.log("No API accessible and allowlisted projects found"); | |
| await client.end(); | |
| return; | |
| } | |
| const sqlLines = []; | |
| const writer = buildWriter(DRY_RUN, client, sqlLines); | |
| let totalLinksProcessed = 0; | |
| try { | |
| for (const { organization_name: org, project_name: project, last_ingest } of scopes) { | |
| if (!DRY_RUN) { | |
| await client.query("BEGIN"); | |
| await client.query("SET LOCAL statement_timeout = '15s'"); | |
| await client.query("SET LOCAL lock_timeout = '5s'"); | |
| await client.query("SET LOCAL idle_in_transaction_session_timeout = '60s'"); | |
| } | |
| try { | |
| if (MODE === "backfill") { | |
| const stats = await backfillProject(org, project, writer); | |
| totalLinksProcessed += stats.kept; | |
| if (!DRY_RUN && writer.flush) await writer.flush(); | |
| if (!DRY_RUN) await client.query("COMMIT"); | |
| console.log(`[backfill] ${org} / ${project}: total=${stats.total} kept=${stats.kept} dropped=${stats.dropped}`); | |
| } else if (MODE === "incremental") { | |
| const since = last_ingest | |
| ? new Date(last_ingest) | |
| : new Date(Date.now() - FIRST_RUN_LOOKBACK_DAYS * 86400 * 1000); | |
| const { scanned, links } = await incrementalProject(org, project, since.toISOString(), writer); | |
| totalLinksProcessed += links; | |
| if (!DRY_RUN && writer.flush) await writer.flush(); | |
| if (!DRY_RUN) await client.query("COMMIT"); | |
| console.log(`[incremental] ${org} / ${project}: scanned=${scanned} upserted=${links}`); | |
| } else { | |
| throw new Error(`Unknown MODE: ${MODE}`); | |
| } | |
| } catch (e) { | |
| if (!DRY_RUN) { try { await client.query("ROLLBACK"); } catch {} } | |
| throw e; | |
| } | |
| await sleep(SLEEP_BETWEEN_PROJECTS_MS); | |
| } | |
| if (DRY_RUN) { | |
| const header = ["-- Dry run output", "BEGIN;"].join("\n"); | |
| const footer = ["-- Cursor not advanced in dry run", "COMMIT;"].join("\n"); | |
| const file = path.resolve(process.cwd(), OUT_FILE); | |
| fs.writeFileSync(file, header + "\n" + sqlLines.join("\n") + "\n" + footer + "\n", "utf8"); | |
| console.log(`Dry run file: ${file}`); | |
| } | |
| } catch (err) { | |
| console.error(err.message || String(err)); | |
| process.exitCode = 1; | |
| } finally { | |
| await client.end(); | |
| const ended = new Date(); | |
| console.log(`HTTP calls: ${httpCalls}`); | |
| console.log(`Run time: ${Math.round((ended - started) / 1000)}s`); | |
| console.log(`Total links processed: ${totalLinksProcessed}.`); | |
| } | |
| } | |
| main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment