Created
September 22, 2023 22:17
-
-
Save fl0wo/5afdf560ae0a6453852b278f01d61f29 to your computer and use it in GitHub Desktop.
Simple way to save candlesticks with batch-write multi-type into Timestream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { OHLCV } from 'ccxt'; | |
import { getInsertRecordParameterToTimeStream, writeBatchToTimestream } from './write'; | |
import { SymbolAndCandles } from '../lambda/candle-fetcher/models'; | |
import { | |
_Record, | |
Dimension, | |
MeasureValue, | |
MeasureValueType, TimestreamWriteClient, | |
WriteRecordsCommand, WriteRecordsCommandOutput, | |
WriteRecordsRequest, | |
} from '@aws-sdk/client-timestream-write'; | |
export const getInsertRecordParameterToTimeStream = ( | |
time: number | undefined, | |
fieldDimensionName: string, | |
fieldDimensionValue: string, | |
fields: Array<{ | |
label: string; | |
value: number; | |
}>, | |
fieldsMeasureName: string, | |
databaseName: string, | |
tableName: string) => { | |
const currentTime = getSafeNull(time, Date.now()); | |
const dimension: Dimension = { | |
Name: fieldDimensionName, | |
Value: fieldDimensionValue, | |
}; | |
const measures: MeasureValue[] = fields.map((el) => { | |
const measure: MeasureValue = { | |
Name: el.label, | |
Type: MeasureValueType.DOUBLE, | |
Value: String(el.value), | |
}; | |
return measure; | |
}); | |
const toInsertRecord: Record<any, any> = { | |
Dimensions: [dimension], | |
MeasureName: fieldsMeasureName, | |
MeasureValues: measures, | |
MeasureValueType: MeasureValueType.MULTI, | |
Time: currentTime.toString(), | |
Version: currentTime, | |
}; | |
const params: WriteRecordsRequest = { | |
DatabaseName: databaseName, | |
TableName: tableName, | |
Records: [toInsertRecord], | |
}; | |
return { params }; | |
}; | |
const extractFromOHLCVToTimeStreamInsertion = (candle: OHLCV, symbol: string) => { | |
return { | |
databaseName: getSafeOrThrow(process.env.TIMESTREAM_DB_NAME, 'TIMESTREAM_DB_NAME CANNOT BE NULL'), | |
tableName: getSafeOrThrow(process.env.TIMESTREAM_TABLE_NAME, 'TIMESTREAM_TABLE_NAME CANNOT BE NULL'), | |
fieldDimensionName: 'crypto', | |
fieldDimensionValue: symbol, | |
fieldsMeasureName: 'ohlcv', | |
fields: [{ | |
label: 'o', | |
value: candle[1], | |
}, { | |
label: 'h', | |
value: candle[2], | |
}, { | |
label: 'l', | |
value: candle[3], | |
}, { | |
label: 'c', | |
value: candle[4], | |
}, { | |
label: 'v', | |
value: candle[5], | |
}], | |
time: candle[0], | |
}; | |
}; | |
const candlesToInsertRecordTimeStreamParams = (el: SymbolAndCandles) => el.candles.map((candle: OHLCV) => { | |
const toInsertInfo = extractFromOHLCVToTimeStreamInsertion(candle, el.symbol); | |
const { params } = getInsertRecordParameterToTimeStream( | |
toInsertInfo.time, | |
toInsertInfo.fieldDimensionName, | |
toInsertInfo.fieldDimensionValue, | |
toInsertInfo.fields, | |
toInsertInfo.fieldsMeasureName, | |
toInsertInfo.databaseName, | |
toInsertInfo.tableName, | |
); | |
return params; | |
}, | |
); | |
export const saveBatchedCandles = async (allCandles: SymbolAndCandles[]) => { | |
if (allCandles.length === 0) { | |
console.log('No candles to save') | |
return Promise.resolve([]); | |
} | |
const allInsertRecordTimeStreamParams = allCandles | |
.map(candlesToInsertRecordTimeStreamParams); | |
return writeBatchToTimestream(allInsertRecordTimeStreamParams); | |
}; | |
export const getSafeOrThrow = <T> (value:T, msg:string):NonNullable<T> => { | |
if (value===undefined || value===null) throw Error(msg); | |
return <NonNullable<T>> value; | |
}; | |
export const writeBatchToTimestream = async (params: WriteRecordsRequest[][]) => { | |
const writeClient = loadWriteTimeStreamClientBulk(); | |
const allWriteRecordRequests = params.flat(); | |
const DatabaseName = getSafeNull( | |
allWriteRecordRequests.find(el=>!!el.DatabaseName)?.DatabaseName, | |
process.env.TIMESTREAM_DB_NAME, | |
); | |
const TableName = getSafeNull( | |
allWriteRecordRequests.find(el=>!!el.TableName)?.TableName, | |
process.env.TIMESTREAM_TABLE_NAME, | |
); | |
// @ts-ignore | |
const records: _Record[] = allWriteRecordRequests.map( | |
(el) => el.Records, | |
) | |
.flat() | |
.filter((el) => !!el); | |
console.log('Sending', records.length, ' total candles'); | |
return saveToTimeStreamInBatches( | |
DatabaseName, | |
TableName, | |
records, | |
writeClient, | |
100, | |
); | |
}; | |
export default async function saveToTimeStreamInBatches( | |
DatabaseName: string, | |
TableName: string, | |
records: _Record[], | |
writeClient: TimestreamWriteClient, | |
batchSize: number = 100) { | |
const batches: _Record[][] = splitArrayInChunks(batchSize, records); | |
const operations:WriteRecordsCommandOutput[] = []; | |
for (const batch of batches) { | |
const outPut = await writeRecordsToTimeStream( | |
DatabaseName, | |
TableName, | |
batch, | |
writeClient, | |
); | |
if (!!outPut) { | |
operations.push(outPut); | |
} | |
} | |
return operations; | |
} | |
async function writeRecordsToTimeStream( | |
DatabaseName: string, | |
TableName: string, | |
records: _Record[], | |
writeClient: TimestreamWriteClient) { | |
const oneRequest: WriteRecordsRequest = { | |
DatabaseName: DatabaseName, | |
TableName: TableName, | |
Records: records, | |
}; | |
// One shot all candles of different types and symbols | |
const command = new WriteRecordsCommand(oneRequest); | |
return await writeClient.send(command) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment