Skip to content

Instantly share code, notes, and snippets.

@wmakeev
Last active September 19, 2023 11:28
Show Gist options
  • Save wmakeev/46ccb3a6636fa743d2c79b6179ed6bb8 to your computer and use it in GitHub Desktop.
Save wmakeev/46ccb3a6636fa743d2c79b6179ed6bb8 to your computer and use it in GitHub Desktop.
[CSV transform] #csv #stream #pipeline #zip
import { parse } from 'csv-parse/sync'
import { fetch } from 'undici'
/**
* Возвращает Map для сопоставления
*
* @param {string} matchTableCsvUrl
* @param {string} fieldFrom
* @param {string} fieldTo
* @returns {Promise<Map<string, string>>}
*/
export const getMatchMap = async (matchTableCsvUrl, fieldFrom, fieldTo) => {
const matchTableCsv = await (await fetch(matchTableCsvUrl)).text()
/** @type {Record<string, string>[]} */
const records = parse(matchTableCsv, {
columns: true,
skip_empty_lines: true
})
const result = new Map(
records.flatMap(r => {
const [valueFrom, valueTo] = [r[fieldFrom], r[fieldTo]]
if (!valueFrom || !valueTo) return []
return [[valueFrom, valueTo]]
})
)
return result
}
import { parse } from 'csv-parse/sync'
import { fetch } from 'undici'
/**
* Возвращает Set для указанной колонки
*
* @param {string} tableCsvUrl
* @param {string} field
* @returns {Promise<Set<string>>}
*/
export const getSet = async (tableCsvUrl, field) => {
const matchTableCsv = await (await fetch(tableCsvUrl)).text()
/** @type {Record<string, string>[]} */
const records = parse(matchTableCsv, {
columns: true,
skip_empty_lines: true
})
const result = new Set(
records.flatMap(r => {
const value = r[field]
if (value == null) return []
return [value]
})
)
return result
}
import { createReadStream, createWriteStream } from "node:fs";
import path from "node:path";
import internal, { Transform } from "node:stream";
import { pipeline } from "node:stream/promises";
import { parse } from "csv-parse";
import { stringify } from "csv-stringify";
import iconv from "iconv-lite";
import { transform } from "stream-transform";
import unzip from "unzipper";
import isNonNullable from "../typeguard/isNotNullable.js";
class FilterTransform extends Transform {
/**
* @override
* @param {RecordFilter} filter
*/
constructor(filter) {
super({
readableObjectMode: true,
writableObjectMode: true,
});
this.streamObjectsFilter = filter;
}
/**
* @override
* @param {Record<string, unknown>} chunk
* @param {BufferEncoding} _
* @param {internal.TransformCallback} next
* @returns
*/
_transform(chunk, _, next) {
if (this.streamObjectsFilter(chunk)) {
return next(null, chunk);
}
next();
}
}
/**
* Transform csv file
*
* @param {string} srcFile
* @param {string} destFile
* @param {TransformConfig} config
*/
export async function transformCsv(srcFile, destFile, config) {
const { transformer, filter, sourceEncoding, headers } = config;
const csvInputStream = createReadStream(srcFile);
const isArchive = path.extname(srcFile) === ".zip";
const unzipTransform = isArchive ? unzip.ParseOne() : null;
const decode = sourceEncoding ? iconv.decodeStream(sourceEncoding) : null;
const csvParser = parse({
delimiter: ";",
columns: headers === undefined ? true : headers,
bom: true,
group_columns_by_name: true,
skip_records_with_error: true,
trim: true,
});
// TODO В документации указано, но ошибка не приходит
csvParser.on("skip", (err) => {
console.error(err.message);
});
const csvTransformer = transform(transformer);
const csvFilter = new FilterTransform(filter);
const csvStringifier = stringify({
delimiter: ",",
header: true,
bom: true,
});
const csvOutStream = createWriteStream(destFile);
await pipeline(
[
csvInputStream,
unzipTransform,
decode,
csvParser,
csvFilter,
csvTransformer,
csvStringifier,
csvOutStream,
].filter(isNonNullable)
);
}
type RecordTransformer = (
record: Record<string, unknown>
) => Record<string, unknown>;
type RecordFilter = (record: Record<string, unknown>) => boolean;
interface TransformConfig {
destFile: string;
transformer: RecordTransformer;
filter: RecordFilter;
sourceEncoding: "win1251";
headers?: string[] | undefined;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment