Last active
September 19, 2023 11:28
-
-
Save wmakeev/46ccb3a6636fa743d2c79b6179ed6bb8 to your computer and use it in GitHub Desktop.
[CSV transform] #csv #stream #pipeline #zip
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { parse } from 'csv-parse/sync' | |
import { fetch } from 'undici' | |
/** | |
* Возвращает Map для сопоставления | |
* | |
* @param {string} matchTableCsvUrl | |
* @param {string} fieldFrom | |
* @param {string} fieldTo | |
* @returns {Promise<Map<string, string>>} | |
*/ | |
export const getMatchMap = async (matchTableCsvUrl, fieldFrom, fieldTo) => { | |
const matchTableCsv = await (await fetch(matchTableCsvUrl)).text() | |
/** @type {Record<string, string>[]} */ | |
const records = parse(matchTableCsv, { | |
columns: true, | |
skip_empty_lines: true | |
}) | |
const result = new Map( | |
records.flatMap(r => { | |
const [valueFrom, valueTo] = [r[fieldFrom], r[fieldTo]] | |
if (!valueFrom || !valueTo) return [] | |
return [[valueFrom, valueTo]] | |
}) | |
) | |
return result | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { parse } from 'csv-parse/sync' | |
import { fetch } from 'undici' | |
/** | |
* Возвращает Set для указанной колонки | |
* | |
* @param {string} tableCsvUrl | |
* @param {string} field | |
* @returns {Promise<Set<string>>} | |
*/ | |
export const getSet = async (tableCsvUrl, field) => { | |
const matchTableCsv = await (await fetch(tableCsvUrl)).text() | |
/** @type {Record<string, string>[]} */ | |
const records = parse(matchTableCsv, { | |
columns: true, | |
skip_empty_lines: true | |
}) | |
const result = new Set( | |
records.flatMap(r => { | |
const value = r[field] | |
if (value == null) return [] | |
return [value] | |
}) | |
) | |
return result | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createReadStream, createWriteStream } from "node:fs"; | |
import path from "node:path"; | |
import internal, { Transform } from "node:stream"; | |
import { pipeline } from "node:stream/promises"; | |
import { parse } from "csv-parse"; | |
import { stringify } from "csv-stringify"; | |
import iconv from "iconv-lite"; | |
import { transform } from "stream-transform"; | |
import unzip from "unzipper"; | |
import isNonNullable from "../typeguard/isNotNullable.js"; | |
class FilterTransform extends Transform { | |
/** | |
* @override | |
* @param {RecordFilter} filter | |
*/ | |
constructor(filter) { | |
super({ | |
readableObjectMode: true, | |
writableObjectMode: true, | |
}); | |
this.streamObjectsFilter = filter; | |
} | |
/** | |
* @override | |
* @param {Record<string, unknown>} chunk | |
* @param {BufferEncoding} _ | |
* @param {internal.TransformCallback} next | |
* @returns | |
*/ | |
_transform(chunk, _, next) { | |
if (this.streamObjectsFilter(chunk)) { | |
return next(null, chunk); | |
} | |
next(); | |
} | |
} | |
/** | |
* Transform csv file | |
* | |
* @param {string} srcFile | |
* @param {string} destFile | |
* @param {TransformConfig} config | |
*/ | |
export async function transformCsv(srcFile, destFile, config) { | |
const { transformer, filter, sourceEncoding, headers } = config; | |
const csvInputStream = createReadStream(srcFile); | |
const isArchive = path.extname(srcFile) === ".zip"; | |
const unzipTransform = isArchive ? unzip.ParseOne() : null; | |
const decode = sourceEncoding ? iconv.decodeStream(sourceEncoding) : null; | |
const csvParser = parse({ | |
delimiter: ";", | |
columns: headers === undefined ? true : headers, | |
bom: true, | |
group_columns_by_name: true, | |
skip_records_with_error: true, | |
trim: true, | |
}); | |
// TODO В документации указано, но ошибка не приходит | |
csvParser.on("skip", (err) => { | |
console.error(err.message); | |
}); | |
const csvTransformer = transform(transformer); | |
const csvFilter = new FilterTransform(filter); | |
const csvStringifier = stringify({ | |
delimiter: ",", | |
header: true, | |
bom: true, | |
}); | |
const csvOutStream = createWriteStream(destFile); | |
await pipeline( | |
[ | |
csvInputStream, | |
unzipTransform, | |
decode, | |
csvParser, | |
csvFilter, | |
csvTransformer, | |
csvStringifier, | |
csvOutStream, | |
].filter(isNonNullable) | |
); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type RecordTransformer = ( | |
record: Record<string, unknown> | |
) => Record<string, unknown>; | |
type RecordFilter = (record: Record<string, unknown>) => boolean; | |
interface TransformConfig { | |
destFile: string; | |
transformer: RecordTransformer; | |
filter: RecordFilter; | |
sourceEncoding: "win1251"; | |
headers?: string[] | undefined; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment