Created
October 19, 2023 15:03
-
-
Save identiq/7edc0fb21bee4c5accea618d7162e31c to your computer and use it in GitHub Desktop.
node:stream/promises
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dayjs from 'dayjs' | |
import { parse } from 'csv-parse' | |
import { fecs, type Company, type Fec } from './schema' | |
import { type Readable, Transform } from 'node:stream' | |
import { stringify } from 'csv-stringify' | |
import { drz, sqlClient } from './drizzle' | |
import { and, eq, sql } from 'drizzle-orm' | |
import { pipeline } from 'node:stream/promises' | |
import { GetObjectCommand } from '@aws-sdk/client-s3' | |
import { s3 } from '~/services/s3/s3.server' | |
import { CompanyStatus, FileStatus } from './enums' | |
import { formatFileDate, isRejected } from './utils' | |
import { FEC_COLUMNS } from './constants' | |
import { getFilesByCompanyId } from '~/models/company/get-file' | |
import { updateCompany } from '~/models/company/update-company' | |
import { updateFile } from '~/models/company/update-file' | |
const transformFec = (companyId: string, untilDateStr: string, fromDateStr: string) => { | |
return new Transform({ | |
objectMode: true, | |
transform( | |
{ | |
ecritureNum, | |
debit, | |
credit, | |
ecritureDate: ecritureDateRaw, | |
dateLet, | |
validDate, | |
pieceDate, | |
compteNum, | |
...record | |
}: Fec, | |
encoding, | |
cb, | |
) { | |
const ecritureDate = formatFileDate(ecritureDateRaw) | |
const ecritureDateStr = ecritureDate?.slice(0, 7) | |
const isValid = | |
ecritureDateStr && | |
ecritureDateStr >= fromDateStr && | |
ecritureDateStr <= untilDateStr | |
if (!isValid) return cb() | |
const statement = { | |
companyId, | |
ecritureDate, | |
compteNum, | |
debit: debit.replace(/,/g, '.'), | |
credit: credit.replace(/,/g, '.'), | |
} | |
cb(null, statement) | |
}, | |
}) | |
} | |
export async function processFiles(companyId: Company['id']) { | |
const companyFiles = await getFilesByCompanyId(companyId) | |
const pendingFiles = companyFiles.filter(({ status }) => status === FileStatus.Pending) | |
if (!pendingFiles.length) { | |
return companyFiles | |
} | |
await updateCompany(companyId, { status: CompanyStatus.Processing }) | |
const streams = await Promise.allSettled( | |
pendingFiles.map(async ({ id, data, untilDate }) => { | |
await updateFile(id, { status: FileStatus.Processing }) | |
return await drz.transaction(async (tx) => { | |
const obj = await s3.send( | |
new GetObjectCommand({ Bucket: process.env.BUCKET_NAME, Key: data }), | |
) | |
if (!obj.Body) throw new Error('File has no Body') | |
const untilDateJs = dayjs(untilDate) | |
const untilDateStr = untilDateJs.format('YYYY-MM') | |
const fromDateStr = untilDateJs | |
.subtract(1, 'year') | |
.add(1, 'month') | |
.format('YYYY-MM') | |
const parser = parse({ | |
delimiter: '\t', | |
trim: true, | |
skip_empty_lines: true, | |
relax_column_count: true, | |
relax_quotes: true, | |
columns: FEC_COLUMNS, | |
from: 2, | |
}) | |
const ingestStream = | |
await sqlClient`copy fec (company_id, credit, debit, compte_num, ecriture_date) from stdin`.writable() | |
await tx | |
.delete(fecs) | |
.where( | |
and( | |
eq(fecs.companyId, companyId), | |
sql`to_char(${fecs.ecritureDate}, 'YYYY-MM') <= ${untilDateStr}`, | |
sql`to_char(${fecs.ecritureDate}, 'YYYY-MM') >= ${fromDateStr}`, | |
), | |
) | |
return pipeline( | |
obj.Body as Readable, | |
parser, | |
transformFec(companyId, untilDateStr, fromDateStr), | |
stringify({ | |
delimiter: '\t', | |
columns: ['companyId', 'credit', 'debit', 'compteNum', 'ecritureDate'], | |
}), | |
ingestStream, | |
) | |
}) | |
}), | |
) | |
const processedFiles = await Promise.all( | |
pendingFiles.map(async (file, i) => { | |
const p = streams[i] | |
return updateFile(file.id, { | |
status: p.status === 'rejected' ? FileStatus.Failed : FileStatus.Complete, | |
processedAt: p.status === 'fulfilled' ? new Date() : undefined, | |
errorMessage: p.status === 'rejected' ? p.reason.toString() : undefined, | |
}) | |
}), | |
) | |
const hasError = streams.some(isRejected) | |
await updateCompany(companyId, { | |
status: hasError ? CompanyStatus.Failed : CompanyStatus.Pending, | |
}) | |
if (hasError) throw new Error(streams.find(isRejected)?.reason) | |
return processedFiles | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment