Created
September 15, 2025 21:25
-
-
Save Sdy603/5655eef8252480175e1aefd03da33f8f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Fetch ADO custom field "Classification" for Work Items and upsert into DX. | |
* Usage: node fetch_classification.js | |
*/ | |
const fs = require('fs'); | |
const path = require('path'); | |
const { Client } = require('pg'); | |
const axios = require('axios'); | |
// ========== CONFIG ========== | |
const CONFIG = { | |
// ADO | |
ORG: 'your-org', | |
PROJECT: 'your-project', | |
ADO_PAT: process.env.AZDO_PAT || 'YOUR_PAT', | |
FIELD_REFNAME: 'Custom.Classification', // confirm in ADO | |
// Mode to decide where to get candidate IDs: 'dx' or 'ado' | |
SOURCE_MODE: 'dx', // 'dx' uses DX DB to list WI IDs, 'ado' queries ADO directly | |
// DX Postgres | |
PG_CONN: 'postgresql://user:pass@host:5432/database', | |
// Tables | |
TARGET_TABLE: 'custom.ado_work_item_classifications', | |
HISTORY_TABLE: 'custom.ado_work_item_classifications_history', | |
// Incremental controls | |
DEFAULT_SINCE: '2024-01-01T00:00:00Z', // starting point for ADO backfill if table is empty | |
BATCH_SIZE: 200, // ADO WorkItemsBatch limit | |
WIQL_WINDOW_DAYS: 14, // window size when walking ADO by ChangedDate | |
// Logging | |
FAIL_CSV: path.resolve(__dirname, 'classification_failures.csv') | |
}; | |
// ========== HTTP CLIENT ========== | |
const ado = axios.create({ | |
baseURL: `https://dev.azure.com/${encodeURIComponent(CONFIG.ORG)}`, | |
auth: { username: '', password: CONFIG.ADO_PAT }, | |
headers: { 'Content-Type': 'application/json' }, | |
timeout: 120000 | |
}); | |
// ========== HELPERS ========== | |
function chunk(arr, size) { | |
const out = []; | |
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); | |
return out; | |
} | |
function csvEscape(v) { | |
if (v == null) return ''; | |
const s = String(v); | |
if (s.includes('"') || s.includes(',') || s.includes('\n')) return `"${s.replace(/"/g, '""')}"`; | |
return s; | |
} | |
async function getLastCapturedChangedDate(pg, table) { | |
const { rows } = await pg.query( | |
`SELECT COALESCE(MAX(changed_date), '1970-01-01') AS max FROM ${table}` | |
); | |
return rows[0].max; | |
} | |
async function ensureFailureFileHeader(file) { | |
if (!fs.existsSync(file)) { | |
fs.writeFileSync(file, 'work_item_id,error\n', 'utf8'); | |
} | |
} | |
function appendFailure(file, id, err) { | |
const msg = typeof err === 'string' ? err : (err?.message || JSON.stringify(err)); | |
fs.appendFileSync(file, `${csvEscape(id)},${csvEscape(msg)}\n`); | |
} | |
// ========== DX CANDIDATE IDS ========== | |
async function fetchCandidateIdsFromDX(pg, sinceIso) { | |
// You likely have a table already, adjust this query to your real source | |
// Example assumes a table custom.ado_work_items you already ingest | |
const sql = ` | |
SELECT id AS work_item_id | |
FROM custom.ado_work_items | |
WHERE changed_date >= $1 | |
ORDER BY id | |
`; | |
const { rows } = await pg.query(sql, [sinceIso]); | |
return rows.map(r => Number(r.work_item_id)); | |
} | |
// ========== ADO CANDIDATE IDS ========== | |
function isoAddDays(iso, days) { | |
return new Date(new Date(iso).getTime() + days * 86400000).toISOString(); | |
} | |
async function fetchIdsFromADOByWindow(project, startIso, endIso) { | |
const ids = []; | |
const wiql = { | |
query: ` | |
SELECT [System.Id] | |
FROM workitems | |
WHERE | |
[System.TeamProject] = '${project.replace(/'/g, "''")}' | |
AND [System.ChangedDate] >= '${startIso}' | |
AND [System.ChangedDate] < '${endIso}' | |
ORDER BY [System.Id] | |
` | |
}; | |
const url = `/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`; | |
const { data } = await ado.post(url, wiql); | |
for (const w of data.workItems || []) ids.push(w.id); | |
return ids; | |
} | |
async function fetchCandidateIdsFromADO(project, sinceIso, windowDays) { | |
const out = []; | |
let cursor = sinceIso; | |
const nowIso = new Date().toISOString(); | |
while (new Date(cursor) < new Date(nowIso)) { | |
const next = isoAddDays(cursor, windowDays); | |
const ids = await fetchIdsFromADOByWindow(project, cursor, next); | |
out.push(...ids); | |
cursor = next; | |
} | |
return Array.from(new Set(out)).sort((a, b) => a - b); | |
} | |
// ========== FETCH DETAILS FROM ADO ========== | |
async function fetchWorkItemsBatch(ids, fields) { | |
if (ids.length === 0) return []; | |
const chunks = chunk(ids, CONFIG.BATCH_SIZE); | |
const out = []; | |
for (const c of chunks) { | |
const url = `/_apis/wit/workitemsbatch?api-version=7.1-preview.1`; | |
const body = { ids: c, $expand: 'None', fields }; | |
const { data } = await ado.post(url, body); | |
for (const wi of data.value || []) { | |
out.push(wi); | |
} | |
} | |
return out; | |
} | |
// ========== UPSERT INTO DX ========== | |
async function upsertRows(pg, rows) { | |
if (rows.length === 0) return; | |
const sql = ` | |
INSERT INTO ${CONFIG.TARGET_TABLE} ( | |
work_item_id, project, classification_ref, classification_val, | |
work_item_type, title, state, area_path, iteration_path, changed_date, | |
row_created_at, row_updated_at | |
) | |
VALUES | |
${rows.map((_, i) => | |
`($${i * 11 + 1}, $${i * 11 + 2}, $${i * 11 + 3}, $${i * 11 + 4}, $${i * 11 + 5}, $${i * 11 + 6}, $${i * 11 + 7}, $${i * 11 + 8}, $${i * 11 + 9}, $${i * 11 + 10}, now(), now())` | |
).join(',\n')} | |
ON CONFLICT (work_item_id) DO UPDATE SET | |
project = EXCLUDED.project, | |
classification_ref = EXCLUDED.classification_ref, | |
classification_val = EXCLUDED.classification_val, | |
work_item_type = EXCLUDED.work_item_type, | |
title = EXCLUDED.title, | |
state = EXCLUDED.state, | |
area_path = EXCLUDED.area_path, | |
iteration_path = EXCLUDED.iteration_path, | |
changed_date = EXCLUDED.changed_date, | |
row_updated_at = now() | |
`; | |
const params = []; | |
for (const r of rows) { | |
params.push( | |
r.work_item_id, | |
r.project, | |
r.classification_ref, | |
r.classification_val, | |
r.work_item_type, | |
r.title, | |
r.state, | |
r.area_path, | |
r.iteration_path, | |
r.changed_date | |
); | |
} | |
await pg.query(sql, params); | |
} | |
async function insertHistory(pg, rows) { | |
if (rows.length === 0) return; | |
const sql = ` | |
INSERT INTO ${CONFIG.HISTORY_TABLE} ( | |
work_item_id, project, classification_ref, classification_val, changed_date | |
) | |
VALUES | |
${rows.map((_, i) => | |
`($${i * 5 + 1}, $${i * 5 + 2}, $${i * 5 + 3}, $${i * 5 + 4}, $${i * 5 + 5})` | |
).join(',\n')} | |
`; | |
const params = []; | |
for (const r of rows) { | |
params.push( | |
r.work_item_id, | |
r.project, | |
r.classification_ref, | |
r.classification_val, | |
r.changed_date | |
); | |
} | |
await pg.query(sql, params); | |
} | |
// ========== MAIN ========== | |
async function main() { | |
if (!CONFIG.ADO_PAT || CONFIG.ADO_PAT === 'YOUR_PAT') { | |
console.error('Set AZDO_PAT or hardcode ADO_PAT in CONFIG.'); | |
process.exit(1); | |
} | |
await ensureFailureFileHeader(CONFIG.FAIL_CSV); | |
const pg = new Client({ connectionString: CONFIG.PG_CONN }); | |
await pg.connect(); | |
// Determine incremental since | |
const last = await getLastCapturedChangedDate(pg, CONFIG.TARGET_TABLE); | |
const sinceIso = last ? new Date(last).toISOString() : CONFIG.DEFAULT_SINCE; | |
console.log(`Incremental since: ${sinceIso}`); | |
// Candidate IDs | |
let candidateIds = []; | |
if (CONFIG.SOURCE_MODE === 'dx') { | |
candidateIds = await fetchCandidateIdsFromDX(pg, sinceIso); | |
} else { | |
candidateIds = await fetchCandidateIdsFromADO(CONFIG.PROJECT, sinceIso, CONFIG.WIQL_WINDOW_DAYS); | |
} | |
console.log(`Candidate work items: ${candidateIds.length}`); | |
if (candidateIds.length === 0) { | |
await pg.end(); | |
console.log('No candidates. Done.'); | |
return; | |
} | |
// Fetch details from ADO | |
const fields = [ | |
'System.Id', | |
'System.Title', | |
'System.State', | |
'System.WorkItemType', | |
'System.AreaPath', | |
'System.IterationPath', | |
'System.ChangedDate', | |
`System.TeamProject`, // project name | |
CONFIG.FIELD_REFNAME // the custom field | |
]; | |
const detailed = await fetchWorkItemsBatch(candidateIds, fields); | |
const upsertPayload = []; | |
for (const wi of detailed) { | |
try { | |
const f = wi.fields || {}; | |
upsertPayload.push({ | |
work_item_id: wi.id, | |
project: f['System.TeamProject'] || CONFIG.PROJECT, | |
classification_ref: CONFIG.FIELD_REFNAME, | |
classification_val: f[CONFIG.FIELD_REFNAME] ?? null, | |
work_item_type: f['System.WorkItemType'] || null, | |
title: f['System.Title'] || null, | |
state: f['System.State'] || null, | |
area_path: f['System.AreaPath'] || null, | |
iteration_path: f['System.IterationPath'] || null, | |
changed_date: f['System.ChangedDate'] ? new Date(f['System.ChangedDate']).toISOString() : null | |
}); | |
} catch (e) { | |
appendFailure(CONFIG.FAIL_CSV, wi?.id, e); | |
} | |
} | |
// Upsert and write history sample | |
await upsertRows(pg, upsertPayload); | |
// Optional: record a history row only when value is not null | |
const historyRows = upsertPayload.filter(r => r.classification_val != null); | |
if (historyRows.length) await insertHistory(pg, historyRows); | |
await pg.end(); | |
console.log(`Upserted rows: ${upsertPayload.length}`); | |
console.log(`Failures logged to: ${CONFIG.FAIL_CSV}`); | |
} | |
main().catch(err => { | |
console.error('Fatal error:', err.response?.data || err.message || err); | |
process.exit(1); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment