Skip to content

Instantly share code, notes, and snippets.

@damianesteban
Created June 21, 2024 15:19
Show Gist options
  • Save damianesteban/2de9f2eb5b6239df4272e6a1440676e7 to your computer and use it in GitHub Desktop.
Save damianesteban/2de9f2eb5b6239df4272e6a1440676e7 to your computer and use it in GitHub Desktop.
data processor
import { Injectable } from '@nestjs/common';
import dayjs from 'dayjs';
import { PassThrough } from 'stream';
import { DataSource } from 'typeorm';
import { CandidHttpService } from './candid-http.service';
import { ClaimsExtract } from './entities/claims-extract.entity';
import { mapJsonToModel } from './utils';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const csv = require('csv-parser');
@Injectable()
export class CandidDataExtractService {
constructor(
private dataSource: DataSource,
private candidHttpService: CandidHttpService,
) {}
/**
* Bulk upsert data into the claims_extract table
* @param data
*/
async bulkUpsertData(data: any[]): Promise<void> {
// Maps and filters out the claimId key
const keys = Object.keys(ClaimsExtract).filter((key) => key !== 'claimId');
// Bulk upsert data into the claims_extract table using the typeorm datsource
await this.dataSource
.createQueryBuilder()
.insert()
.into(ClaimsExtract)
.values(data)
.orUpdate(
keys,
["claimId"],
)
.execute()
}
/**
* Fetches and maps claims data
* @returns
*/
async fetchAndMapClaimsData(): Promise<ClaimsExtract[]> {
const threeDaysAgo = dayjs().subtract(3, 'day').format('YYYY-MM-DD');
const yesterday = dayjs().subtract(1, 'day').format('YYYY-MM-DD');
const response = await this.candidHttpService.getAuthenticatedDownloadUrl({ startDate: threeDaysAgo, endDate: yesterday });
const data = await this.candidHttpService.getAuthenticatedFileData(response);
// Create a stream to pipe the data, this way we don't have to wait for the entire file to be downloaded,
// loaded into memory, or saved to disk.
const csvStream = new PassThrough();
// Pipe the data to the stream
data.pipe(csvStream);
const results = [];
// Return a promise that resolves when the stream ends and the data is processed
return new Promise((resolve, reject) => {
csvStream
.pipe(csv())
.on('data', (data) => results.push(data))
.on('end', () => {
const mappedData = results.map((result) => mapJsonToModel(result));
resolve(mappedData);
})
.on('error', reject);
});
}
/**
* Process claims data job
*/
async processClaimsDataJob(): Promise<void> {
const data = await this.fetchAndMapClaimsData();
await this.bulkUpsertData(data);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment