Skip to content

Instantly share code, notes, and snippets.

@fl0wo
Created September 22, 2023 22:17
Show Gist options
  • Save fl0wo/5afdf560ae0a6453852b278f01d61f29 to your computer and use it in GitHub Desktop.
Save fl0wo/5afdf560ae0a6453852b278f01d61f29 to your computer and use it in GitHub Desktop.
Simple way to save candlesticks with batch-write multi-type into Timestream
import { OHLCV } from 'ccxt';
import { getInsertRecordParameterToTimeStream, writeBatchToTimestream } from './write';
import { SymbolAndCandles } from '../lambda/candle-fetcher/models';
import {
_Record,
Dimension,
MeasureValue,
MeasureValueType, TimestreamWriteClient,
WriteRecordsCommand, WriteRecordsCommandOutput,
WriteRecordsRequest,
} from '@aws-sdk/client-timestream-write';
export const getInsertRecordParameterToTimeStream = (
time: number | undefined,
fieldDimensionName: string,
fieldDimensionValue: string,
fields: Array<{
label: string;
value: number;
}>,
fieldsMeasureName: string,
databaseName: string,
tableName: string) => {
const currentTime = getSafeNull(time, Date.now());
const dimension: Dimension = {
Name: fieldDimensionName,
Value: fieldDimensionValue,
};
const measures: MeasureValue[] = fields.map((el) => {
const measure: MeasureValue = {
Name: el.label,
Type: MeasureValueType.DOUBLE,
Value: String(el.value),
};
return measure;
});
const toInsertRecord: Record<any, any> = {
Dimensions: [dimension],
MeasureName: fieldsMeasureName,
MeasureValues: measures,
MeasureValueType: MeasureValueType.MULTI,
Time: currentTime.toString(),
Version: currentTime,
};
const params: WriteRecordsRequest = {
DatabaseName: databaseName,
TableName: tableName,
Records: [toInsertRecord],
};
return { params };
};
const extractFromOHLCVToTimeStreamInsertion = (candle: OHLCV, symbol: string) => {
return {
databaseName: getSafeOrThrow(process.env.TIMESTREAM_DB_NAME, 'TIMESTREAM_DB_NAME CANNOT BE NULL'),
tableName: getSafeOrThrow(process.env.TIMESTREAM_TABLE_NAME, 'TIMESTREAM_TABLE_NAME CANNOT BE NULL'),
fieldDimensionName: 'crypto',
fieldDimensionValue: symbol,
fieldsMeasureName: 'ohlcv',
fields: [{
label: 'o',
value: candle[1],
}, {
label: 'h',
value: candle[2],
}, {
label: 'l',
value: candle[3],
}, {
label: 'c',
value: candle[4],
}, {
label: 'v',
value: candle[5],
}],
time: candle[0],
};
};
const candlesToInsertRecordTimeStreamParams = (el: SymbolAndCandles) => el.candles.map((candle: OHLCV) => {
const toInsertInfo = extractFromOHLCVToTimeStreamInsertion(candle, el.symbol);
const { params } = getInsertRecordParameterToTimeStream(
toInsertInfo.time,
toInsertInfo.fieldDimensionName,
toInsertInfo.fieldDimensionValue,
toInsertInfo.fields,
toInsertInfo.fieldsMeasureName,
toInsertInfo.databaseName,
toInsertInfo.tableName,
);
return params;
},
);
export const saveBatchedCandles = async (allCandles: SymbolAndCandles[]) => {
if (allCandles.length === 0) {
console.log('No candles to save')
return Promise.resolve([]);
}
const allInsertRecordTimeStreamParams = allCandles
.map(candlesToInsertRecordTimeStreamParams);
return writeBatchToTimestream(allInsertRecordTimeStreamParams);
};
export const getSafeOrThrow = <T> (value:T, msg:string):NonNullable<T> => {
if (value===undefined || value===null) throw Error(msg);
return <NonNullable<T>> value;
};
export const writeBatchToTimestream = async (params: WriteRecordsRequest[][]) => {
const writeClient = loadWriteTimeStreamClientBulk();
const allWriteRecordRequests = params.flat();
const DatabaseName = getSafeNull(
allWriteRecordRequests.find(el=>!!el.DatabaseName)?.DatabaseName,
process.env.TIMESTREAM_DB_NAME,
);
const TableName = getSafeNull(
allWriteRecordRequests.find(el=>!!el.TableName)?.TableName,
process.env.TIMESTREAM_TABLE_NAME,
);
// @ts-ignore
const records: _Record[] = allWriteRecordRequests.map(
(el) => el.Records,
)
.flat()
.filter((el) => !!el);
console.log('Sending', records.length, ' total candles');
return saveToTimeStreamInBatches(
DatabaseName,
TableName,
records,
writeClient,
100,
);
};
export default async function saveToTimeStreamInBatches(
DatabaseName: string,
TableName: string,
records: _Record[],
writeClient: TimestreamWriteClient,
batchSize: number = 100) {
const batches: _Record[][] = splitArrayInChunks(batchSize, records);
const operations:WriteRecordsCommandOutput[] = [];
for (const batch of batches) {
const outPut = await writeRecordsToTimeStream(
DatabaseName,
TableName,
batch,
writeClient,
);
if (!!outPut) {
operations.push(outPut);
}
}
return operations;
}
async function writeRecordsToTimeStream(
DatabaseName: string,
TableName: string,
records: _Record[],
writeClient: TimestreamWriteClient) {
const oneRequest: WriteRecordsRequest = {
DatabaseName: DatabaseName,
TableName: TableName,
Records: records,
};
// One shot all candles of different types and symbols
const command = new WriteRecordsCommand(oneRequest);
return await writeClient.send(command)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment