Created
November 26, 2024 20:37
-
-
Save maietta/078b1b99b9149117cc6329077ffb1a86 to your computer and use it in GitHub Desktop.
Example real-time replication from Pocketbase to MySQL with different data schemas.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM oven/bun:latest | |
WORKDIR /app | |
COPY . . | |
RUN bun install | |
RUN bun build ./realtime.ts --compile --outfile realtime | |
RUN chmod +x /app/realtime | |
ENTRYPOINT ["/app/realtime"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"dependencies": { | |
"eventsource": "^2.0.2", | |
"mysql2": "^3.11.4", | |
"pocketbase": "^0.22.0" | |
}, | |
"devDependencies": { | |
"@types/bun": "^1.1.14", | |
"@types/eventsource": "^1.1.15" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import PocketBase from 'pocketbase'; | |
import mysql from 'mysql2/promise'; | |
import eventsource from 'eventsource'; | |
/* Create .env file with. Adjust program as desired. This is only an example of usage. | |
POCKETBASE_URL=https://<pocketbase>/ | |
MYSQL_URI=mysql://<user>:<password>@<ip>/<database> | |
*/ | |
const POCKETBASE_URL = process.env.POCKETBASE_URL!; | |
const MYSQL_URI = process.env.MYSQL_URI!; | |
const CLASS_TO_TABLE: Record<string, string> = { | |
RE_1: 'Property_RE_1', | |
LD_2: 'Property_LD_2', | |
CI_3: 'Property_CI_3', | |
MF_4: 'Property_MF_4', | |
}; | |
// Helper function to format value for MySQL | |
function formatValueForMySQL(key: string, value: any) { | |
if (/^(L_Price|L_Area|L_SqFt)$/.test(key)) { | |
if (value === '' || value === null || value === undefined) { | |
return null; | |
} | |
const numberValue = Number(value); | |
return isNaN(numberValue) ? null : numberValue.toFixed(0); | |
} else if (/Date$/.test(key) || key.includes('Date')) { | |
if (value === '' || value === null || value === undefined) { | |
return '0000-00-00'; | |
} | |
const date = new Date(value); | |
if (isNaN(date.getTime())) { | |
console.warn(`Invalid date for field ${key}: ${value}`); | |
return null; | |
} | |
return date.toISOString().slice(0, 19).replace('T', ' '); | |
} else if (typeof value === 'boolean') { | |
return value ? 1 : 0; | |
} else if (value === null || value === undefined || value === '') { | |
return null; | |
} | |
return value; | |
} | |
async function replicateData(pool: mysql.Pool, record: any) { | |
const { class: recordClass, data, photos } = record; | |
const tableName = CLASS_TO_TABLE[recordClass]; | |
if (!tableName) { | |
console.warn(`No table mapping found for class: ${recordClass}`); | |
return; | |
} | |
if (record.status === "SOLD") { | |
console.log(`Deleting record ID: ${record.mlsid} with status ${record.status} from table: ${tableName}`); | |
const deleteListingSql = ` | |
DELETE FROM \`${tableName}\` WHERE \`L_ListingID\` = ?; | |
`; | |
const deletePhotosSql = ` | |
DELETE FROM \`photo_processing\` WHERE \`mls_id\` = ?; | |
`; | |
try { | |
await pool.query(deleteListingSql, [data.L_ListingID]); | |
console.log(`Deleted record ID: ${record.id} from table: ${tableName}`); | |
await pool.query(deletePhotosSql, [data.L_ListingID]); | |
console.log(`Deleted photos for MLS ID: ${data.L_ListingID}`); | |
} catch (deleteError) { | |
console.error(`Error deleting record ID: ${record.id}`, deleteError); | |
} | |
} else { | |
const filteredData = Object.fromEntries( | |
Object.entries(data) | |
.filter(([key]) => key !== 'L_Concessions') | |
.map(([key, value]) => [key, formatValueForMySQL(key, value)]) | |
); | |
const fields = Object.keys(filteredData); | |
const values = Object.values(filteredData); | |
values[fields.indexOf('L_StatusCatID')] = record.status; | |
const placeholders = fields.map(() => '?').join(', '); | |
const sql = ` | |
INSERT INTO \`${tableName}\` (${fields.map(f => `\`${f}\``).join(', ')}) | |
VALUES (${placeholders}) | |
ON DUPLICATE KEY UPDATE ${fields.map(f => `\`${f}\` = VALUES(\`${f}\`)`).join(', ')} | |
`; | |
try { | |
await pool.query(sql, values); | |
console.log(`Replicated record ID: ${record.mlsid} to table: ${tableName}`); | |
} catch (queryError) { | |
console.error(`Error executing query for record ID: ${record.id}`, queryError); | |
} | |
if (photos) { | |
const photoInsertSql = ` | |
INSERT INTO \`photo_processing\` (\`mls_id\`, \`photo_data\`) | |
VALUES (?, ?) | |
ON DUPLICATE KEY UPDATE \`photo_data\` = VALUES(\`photo_data\`); | |
`; | |
try { | |
await pool.query(photoInsertSql, [data.L_ListingID, JSON.stringify(photos)]); | |
console.log(`Replicated photos for MLS ID: ${data.L_ListingID}`); | |
} catch (photoError) { | |
console.error(`Error inserting photos for MLS ID: ${data.L_ListingID}`, photoError); | |
} | |
} | |
} | |
} | |
async function listenForChanges() { | |
global.EventSource = eventsource as unknown as typeof EventSource; | |
const pb = new PocketBase(POCKETBASE_URL); | |
const pool = await mysql.createPool(MYSQL_URI); | |
pb.collection('listings').subscribe('*', async (event: any) => { | |
console.log(`Event received: ${event.action} for record ID: ${event.record.id}`); | |
await replicateData(pool, event.record); | |
}); | |
console.log('Listening for PocketBase changes...'); | |
} | |
async function replicateInitialData() { | |
const pb = new PocketBase(POCKETBASE_URL); | |
const pool = await mysql.createPool(MYSQL_URI); | |
try { | |
console.log('Fetching initial listings from PocketBase...'); | |
const records = await pb.collection('listings').getFullList(200, { | |
sort: '-created', | |
}); | |
console.log(`Fetched ${records.length} records.`); | |
for (const record of records) { | |
await replicateData(pool, record); | |
} | |
} catch (error) { | |
console.error('Error during initial data replication:', error); | |
} | |
} | |
async function main() { | |
await replicateInitialData(); | |
await listenForChanges(); | |
} | |
main().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This example is very specific for my use-case. This shows all the scenarios I ran into and had to encapsulate into a single, efficient program.
The first job is to sync up the data on run, the second task is to listen for changes and fire off replication for reach record that changes.
The Pocketbase collection has mlsid, status, class, data(json), and photos(json).
The MySQL tables are 4 tables representing class field in Pocketbase with each of the columns represented by the key/value pairs in the JSON data stored in data.
A 5th table in MySQL holds photos JSON data but uses a different field name for mlsid. So, it gets adjusted accordingly.