Created
May 25, 2021 16:55
-
-
Save onlybakam/3bcd0e2c57c5ffd9256dc3e02989be20 to your computer and use it in GitHub Desktop.
Connect Amplify DataStore with existing SQL datasources - Lambda Function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as AWS from 'aws-sdk' | |
import { | |
createConnection, | |
ConnectionOptions, | |
Connection, | |
RowDataPacket, | |
OkPacket, | |
ResultSetHeader, | |
} from 'mysql2/promise' | |
import * as schema from './schema' | |
import { ModelFieldType, UserSchema } from '@aws-amplify/datastore' | |
type SqlModel = { | |
name: string | |
table: string | |
fields: string[] | |
belongsTo?: { model: string; on: string; name: string } | |
} | |
type Row = { | |
[column in string | number]: any | |
} | |
type reqArgs = { | |
model: SqlModel | |
models: { [key: string]: SqlModel } | |
connection: Connection | |
} | |
type queryReqArgs = reqArgs & { | |
args: { | |
limit: number | |
lastSync: number | |
nextToken: string | |
} | |
} | |
type MutationArgs = reqArgs & { | |
args: { | |
input: { | |
id: string | |
_version: number | |
[key: string]: any | |
} | |
} | |
} | |
const { RDS_PROXY_URL, DATABASE, USERNAME, REGION } = process.env | |
const DeltaSyncConfig = { | |
DeltaSyncTableTTL: 30, // 30 minutes | |
BaseTableTTL: 30 * 24 * 60, // => 43200, 30 days in minutes | |
} | |
const MIN_TO_MILLI = 60 * 1_000 | |
const DELTA_SYNC_PREFIX = 'DeltaSync' | |
const signer = new AWS.RDS.Signer({ | |
region: REGION, | |
port: 3306, | |
username: USERNAME, | |
hostname: RDS_PROXY_URL, | |
}) | |
const initConn = () => { | |
const connectionConfig: ConnectionOptions = { | |
host: RDS_PROXY_URL, | |
database: DATABASE, | |
user: USERNAME, | |
ssl: 'Amazon RDS', | |
authPlugins: { | |
mysql_clear_password: ({ connection, command }) => async (data) => | |
signer.getAuthToken({}), | |
}, | |
} | |
return createConnection(connectionConfig) | |
} | |
const isObject = (o: any) => typeof o === 'object' && o !== null | |
const isModelField = (field: any) => | |
isObject(field.type) && typeof field.type.model === 'string' | |
const PARENT_PREFIX = '__PARENT__' | |
const PP_REGEX = /^__PARENT__/ | |
const parentFields = (model: SqlModel) => { | |
return model.fields | |
.map((f) => `${model.table}.${f} as ${PARENT_PREFIX}${f}`) | |
.join(', ') | |
} | |
const arrange = (row: Row, belongsToName: string) => { | |
const child: Row = {} | |
const belongsTo: Row = {} | |
for (const [field, value] of Object.entries(row)) { | |
if (field.startsWith(PARENT_PREFIX)) { | |
belongsTo[field.replace(PP_REGEX, '')] = value | |
} else { | |
child[field] = value | |
} | |
} | |
child[belongsToName] = toModel(belongsTo) | |
return child | |
} | |
const deltaSyncTable = (baseTable: string) => DELTA_SYNC_PREFIX + baseTable | |
const toModel = (row: Row, model?: SqlModel) => { | |
const mysql_id = row.id // include in log | |
const id = row._datastore_uuid || `datastore-uuid-${row.id}` | |
return { | |
...(model && model.belongsTo ? arrange(row, model.belongsTo.name) : row), | |
mysql_id, | |
id, | |
_lastChangedAt: parseInt(`${new Date(row._lastChangedAt).getTime()}`), | |
} | |
} | |
const _runQuery = async < | |
T extends | |
| RowDataPacket[][] | |
| RowDataPacket[] | |
| OkPacket | |
| OkPacket[] | |
| ResultSetHeader | |
>( | |
conn: Connection, | |
sql: string, | |
values: any[] | |
) => { | |
console.log(`execute sql >`) | |
console.log(sql.trim().replace(/\s+/g, ' ')) | |
console.log(`with values >`) | |
console.log(JSON.stringify(values, null, 2)) | |
const [result] = await conn.query<T>(sql, values) | |
console.log(`result >`) | |
console.log(JSON.stringify(result, null, 2)) | |
return result | |
} | |
type SelectRowArgs = { | |
model: SqlModel | |
models: { [key: string]: SqlModel } | |
lookupId: string | number | |
connection: Connection | |
} | |
const _selectRow = async ({ | |
model, | |
models, | |
lookupId, | |
connection, | |
}: SelectRowArgs) => { | |
let sql = null | |
const table = model.table | |
if (model.belongsTo) { | |
const parent = models[model.belongsTo.model] | |
const pTable = parent.table | |
sql = ` | |
SELECT ${table}.*, ${parentFields(parent)} | |
FROM ${table} | |
LEFT JOIN ${pTable} ON ${table}.${model.belongsTo.on} = ${pTable}.id | |
WHERE ${table}.id = ?` | |
} else { | |
sql = `SELECT * FROM ${table} WHERE id = ?` | |
} | |
const values = [lookupId] | |
// RETRIEVE the row and potential parent | |
const [row] = await _runQuery<RowDataPacket[]>(connection, sql, values) | |
return row | |
} | |
type WriteToDeltaSyncArgs = { | |
row: Row | |
model: SqlModel | |
connection: Connection | |
} | |
const _writeToDeltaSyncTable = async ({ | |
row, | |
model, | |
connection, | |
}: WriteToDeltaSyncArgs) => { | |
const ds = Object.assign({}, row) | |
delete ds.id | |
delete ds._ttl | |
delete ds.parentUUID | |
delete ds.parentDeleted | |
if (model.belongsTo) { | |
for (const key of Object.keys(ds)) { | |
if (key.startsWith(PARENT_PREFIX)) { | |
delete ds[key] | |
} | |
} | |
} | |
const keys = Object.keys(ds) | |
const sql = `INSERT INTO ${deltaSyncTable(model.table)} (${keys.join( | |
',' | |
)}, _ttl) VALUES(${keys | |
.map((k) => '?') | |
.join(',')}, TIMESTAMPADD(MINUTE, ?, CURRENT_TIMESTAMP(3)))` | |
const values = keys.map((k) => ds[k]) | |
values.push(DeltaSyncConfig.DeltaSyncTableTTL) | |
return await _runQuery<ResultSetHeader>(connection, sql, values) | |
} | |
type UpdateWithRowLockArgs = { | |
sql: string | |
values: any[] | |
uuid: string | |
connection: Connection | |
model: SqlModel | |
models: { [key: string]: SqlModel } | |
} | |
const _doUpdateTransactionWithRowLock = async ({ | |
sql, | |
values, | |
uuid, | |
connection, | |
model, | |
models, | |
}: UpdateWithRowLockArgs) => { | |
// START TRANSACTION to lock the row | |
await connection.query(`START TRANSACTION`) | |
const table = model.table | |
// TRY to lock the row for update | |
const locksql = `SELECT id FROM ${table} WHERE _datastore_uuid=? LOCK IN SHARE MODE;` | |
const [existing] = await _runQuery<RowDataPacket[]>(connection, locksql, [ | |
uuid, | |
]) | |
// UPDATE the row - op specific | |
const result = await _runQuery<ResultSetHeader>(connection, sql, values) | |
const row = await _selectRow({ | |
lookupId: existing.id, | |
model, | |
models, | |
connection, | |
}) | |
// FINALLY COMMIT | |
await connection.query('COMMIT;') | |
if (result.affectedRows !== 1) { | |
// INITIAL operation did not update a row, return unhandled mismatch | |
console.error('Error: version mismatch on item') | |
return { | |
data: toModel(row, model), | |
errorMessage: 'Conflict', | |
errorType: 'ConflictUnhandled', | |
} | |
} | |
// WRITE record to the DeltaSync table if row was created | |
if (row && row.id) { | |
await _writeToDeltaSyncTable({ row, model, connection }) | |
} | |
return { data: toModel(row, model) } | |
} | |
const _query = async ({ | |
args: { limit = 1_000, lastSync, nextToken: inNextToken }, | |
model, | |
models, | |
connection, | |
}: queryReqArgs) => { | |
const startedAt = Date.now() | |
const moment = startedAt - DeltaSyncConfig.DeltaSyncTableTTL * MIN_TO_MILLI | |
let sql | |
let values = [] | |
let offset = 0 | |
if (inNextToken) { | |
const tokenInfo = JSON.parse(Buffer.from(inNextToken, 'base64').toString()) | |
offset = tokenInfo.offset | |
} | |
const table = model.table | |
if (model.belongsTo) { | |
const parent = models[model.belongsTo.model] | |
const pTable = parent.table | |
sql = ` | |
SELECT ${table}.*, ${parentFields(parent)} | |
FROM ${table} | |
LEFT JOIN ${pTable} ON ${table}.${model.belongsTo.on} = ${pTable}.id` | |
} else { | |
sql = `SELECT * FROM ${table}` | |
} | |
if (lastSync === undefined) { | |
// If the lastSync field is not specified, a Scan on the Base table is performed. | |
sql += ` ORDER BY ${table}.id LIMIT ?, ?` | |
values = [offset, limit] | |
} else if (lastSync < moment) { | |
// the value is before the current moment - DeltaSyncTTL, a Scan on the Base table is performed. | |
sql += ` WHERE ${table}._lastChangedAt > FROM_UNIXTIME(?/1000) ORDER BY ${table}.id LIMIT ?, ?` | |
values = [lastSync, offset, limit] | |
} else { | |
// the value is on or after the current moment - DeltaSyncTTL, a Query on the Delta table is performed. | |
const dsTable = deltaSyncTable(table) | |
if (model.belongsTo) { | |
const parent = models[model.belongsTo.model] | |
const pTable = parent.table | |
sql = ` | |
SELECT ${dsTable}.*, ${parentFields(parent)} | |
FROM ${dsTable} | |
LEFT JOIN ${pTable} ON ${dsTable}.${model.belongsTo.on} = ${pTable}.id` | |
} else { | |
sql = `SELECT ${dsTable}.* FROM ${dsTable}` | |
} | |
sql += ` WHERE ${dsTable}._lastChangedAt > FROM_UNIXTIME(?/1000) ORDER BY ${dsTable}.id LIMIT ?, ?` | |
values = [lastSync, offset, limit] | |
} | |
// FETCH the rows | |
const rows = await _runQuery<RowDataPacket[]>(connection, sql, values) | |
// EVALUATE next token | |
let nextToken = null | |
if (rows.length >= limit) { | |
nextToken = Buffer.from( | |
JSON.stringify({ offset: offset + rows.length }) | |
).toString('base64') | |
} | |
const items = rows.map((row) => toModel(row, model)) | |
const response = { data: { items, startedAt, nextToken } } | |
return response | |
} | |
const _create = async ({ | |
args: { input }, | |
connection, | |
model, | |
models, | |
}: MutationArgs) => { | |
const { id, ...rest } = input | |
const item: { [key: string]: any } = { ...rest, _datastore_uuid: id } | |
const table = model.table | |
if (model.belongsTo) { | |
const parent = models[model.belongsTo.model] | |
const pTable = parent.table | |
// fetch id of belongsTo item | |
const sql = `select id from ${pTable} where _datastore_uuid = ?` | |
const values = [item[model.belongsTo.on] as string] | |
const [row] = await _runQuery(connection, sql, values) | |
item[model.belongsTo.on] = row.id | |
} | |
const keys = Object.keys(item) | |
const sql = `INSERT INTO ${table} (${keys.join(',')}) VALUES(${keys | |
.map((k) => '?') | |
.join(',')})` | |
const values = keys.map((k) => item[k]) | |
// INSERT the new row | |
const result = await _runQuery<ResultSetHeader>(connection, sql, values) | |
const row = await _selectRow({ | |
lookupId: result.insertId, | |
connection, | |
model, | |
models, | |
}) | |
// UPDATE the DeltaSync table if row was created | |
if (row && row.id) { | |
await _writeToDeltaSyncTable({ row, model, connection }) | |
} | |
return { data: toModel(row, model) } | |
} | |
const _update = async ({ | |
args: { input }, | |
connection, | |
model, | |
models, | |
}: MutationArgs) => { | |
const { id: uuid, _version = 0, ...item } = input | |
const keys = Object.keys(item) | |
const table = model.table | |
const sql = `UPDATE ${table} SET ${keys | |
.map((k) => k + ' = ?') | |
.join( | |
', ' | |
)}, _version=_version+1 WHERE _datastore_uuid = ? AND _version = ?` | |
const values = keys.map((k) => item[k]) | |
values.push(uuid) | |
values.push(_version) | |
return await _doUpdateTransactionWithRowLock({ | |
sql, | |
values, | |
uuid, | |
connection, | |
model, | |
models, | |
}) | |
} | |
const _delete = async ({ | |
args: { input }, | |
connection, | |
model, | |
models, | |
}: MutationArgs) => { | |
const { id: uuid, _version = 0 } = input | |
const table = model.table | |
const sql = ` | |
UPDATE ${table} SET _deleted=true, _version=_version+1, _ttl = TIMESTAMPADD(MINUTE, ?, CURRENT_TIMESTAMP(3)) | |
WHERE _datastore_uuid = ? AND _version = ?` | |
const values = [DeltaSyncConfig.BaseTableTTL, uuid, _version] | |
return await _doUpdateTransactionWithRowLock({ | |
sql, | |
values, | |
uuid, | |
connection, | |
model, | |
models, | |
}) | |
} | |
const processSchema = (schema: UserSchema) => { | |
const schemaDetails: { | |
operations: { [key: string]: { fn: Function; name: string } } | |
models: { [key: string]: SqlModel } | |
} = { operations: {}, models: {} } | |
if (!schema || !schema.models) return schemaDetails | |
return Object.entries(schema.models) | |
.filter(([, model]) => model.syncable) | |
.reduce((prev, [, model]) => { | |
const name = model.name | |
const table = model.pluralName | |
const modelFields = Object.entries(model.fields) | |
.filter(([fKey, field]) => !isModelField(field)) | |
.map(([fKey]) => fKey) | |
const [, belongsToField] = Object.entries(model.fields).find( | |
([, field]) => | |
isModelField(field) && | |
field.association?.connectionType === 'BELONGS_TO' | |
) || [null, null] | |
prev.operations[`sync${table}`] = { fn: _query, name } | |
prev.operations[`create${name}`] = { fn: _create, name } | |
prev.operations[`update${name}`] = { fn: _update, name } | |
prev.operations[`delete${name}`] = { fn: _delete, name } | |
prev.models[name] = { | |
name, | |
table, | |
...(belongsToField | |
? { | |
belongsTo: { | |
model: (belongsToField.type as ModelFieldType).model, | |
name: belongsToField.name, | |
on: (belongsToField.association as any)?.targetName, | |
}, | |
} | |
: null), | |
fields: [ | |
...new Set([ | |
'_version', | |
'_deleted', | |
'_lastChangedAt', | |
'_ttl', | |
'createdAt', | |
'updatedAt', | |
'_datastore_uuid', | |
...modelFields, | |
]), | |
], | |
} | |
return prev | |
}, schemaDetails) | |
} | |
const { operations, models } = processSchema(schema.schema) | |
console.log(`operations >`, JSON.stringify(operations, null, 2)) | |
console.log(`models >`, JSON.stringify(models, null, 2)) | |
exports.handler = async (event: { fieldName: string; arguments: any }) => { | |
try { | |
console.log(`passed event >`, JSON.stringify(event, null, 2)) | |
const { fieldName: operation, arguments: args } = event | |
if (operation in operations) { | |
const connection = await initConn() | |
const { fn, name } = operations[operation] | |
const model = models[name] | |
const result = await fn.apply(undefined, [ | |
{ args, connection, model, models }, | |
]) | |
await connection.end() | |
console.log(`final step result >`, JSON.stringify(result, null, 2)) | |
return result | |
} | |
} catch (error) { | |
console.log(`Error: unhandled error >`, JSON.stringify(error, null, 2)) | |
return { | |
data: null, | |
errorMessage: error.message || JSON.stringify(error), | |
errorType: 'InternalFailure', | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment