Created
June 21, 2024 15:19
-
-
Save damianesteban/2de9f2eb5b6239df4272e6a1440676e7 to your computer and use it in GitHub Desktop.
data processor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Injectable } from '@nestjs/common'; | |
import dayjs from 'dayjs'; | |
import { PassThrough } from 'stream'; | |
import { DataSource } from 'typeorm'; | |
import { CandidHttpService } from './candid-http.service'; | |
import { ClaimsExtract } from './entities/claims-extract.entity'; | |
import { mapJsonToModel } from './utils'; | |
// eslint-disable-next-line @typescript-eslint/no-var-requires | |
const csv = require('csv-parser'); | |
@Injectable() | |
export class CandidDataExtractService { | |
constructor( | |
private dataSource: DataSource, | |
private candidHttpService: CandidHttpService, | |
) {} | |
/** | |
* Bulk upsert data into the claims_extract table | |
* @param data | |
*/ | |
async bulkUpsertData(data: any[]): Promise<void> { | |
// Maps and filters out the claimId key | |
const keys = Object.keys(ClaimsExtract).filter((key) => key !== 'claimId'); | |
// Bulk upsert data into the claims_extract table using the typeorm datsource | |
await this.dataSource | |
.createQueryBuilder() | |
.insert() | |
.into(ClaimsExtract) | |
.values(data) | |
.orUpdate( | |
keys, | |
["claimId"], | |
) | |
.execute() | |
} | |
/** | |
* Fetches and maps claims data | |
* @returns | |
*/ | |
async fetchAndMapClaimsData(): Promise<ClaimsExtract[]> { | |
const threeDaysAgo = dayjs().subtract(3, 'day').format('YYYY-MM-DD'); | |
const yesterday = dayjs().subtract(1, 'day').format('YYYY-MM-DD'); | |
const response = await this.candidHttpService.getAuthenticatedDownloadUrl({ startDate: threeDaysAgo, endDate: yesterday }); | |
const data = await this.candidHttpService.getAuthenticatedFileData(response); | |
// Create a stream to pipe the data, this way we don't have to wait for the entire file to be downloaded, | |
// loaded into memory, or saved to disk. | |
const csvStream = new PassThrough(); | |
// Pipe the data to the stream | |
data.pipe(csvStream); | |
const results = []; | |
// Return a promise that resolves when the stream ends and the data is processed | |
return new Promise((resolve, reject) => { | |
csvStream | |
.pipe(csv()) | |
.on('data', (data) => results.push(data)) | |
.on('end', () => { | |
const mappedData = results.map((result) => mapJsonToModel(result)); | |
resolve(mappedData); | |
}) | |
.on('error', reject); | |
}); | |
} | |
/** | |
* Process claims data job | |
*/ | |
async processClaimsDataJob(): Promise<void> { | |
const data = await this.fetchAndMapClaimsData(); | |
await this.bulkUpsertData(data); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment