Last active
November 20, 2020 15:08
-
-
Save nicholasglesmann/c09c7dc11bd23c787ebbf1df179a9da0 to your computer and use it in GitHub Desktop.
Using Salesforce Change Data Capture to listen for CRUD Account/Contact Record events in one Org (sourceConn) and duplicate them in another Org (targetConn)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const jsforce = require('jsforce'); | |
| const myFuncs = require('./myFuncs'); | |
| const sourceUsername = '[email protected]'; | |
| const targetUsername = '[email protected]'; | |
| const externalIdFieldName = 'External_Id__c'; | |
| // The Salesforce streaming topic and position to start from. | |
| const channel = "/data/ChangeEvents"; | |
| const replayId = -1; // receive only new messages without replay | |
| let sourceConn; | |
| let targetConn; | |
| let isFixingTargetAuth = false; | |
| let retryArr = []; | |
| async function start() | |
| { | |
| let sourceAuth = await myFuncs.getSFAuth(sourceUsername, 'https://login.salesforce.com'); | |
| sourceConn = new jsforce.Connection({ | |
| instanceUrl: sourceAuth.instance_url, | |
| accessToken: sourceAuth.access_token | |
| }); | |
| await setUpTargetAuth(); | |
| // Construct a streaming client. | |
| console.log('Constructing client...'); | |
| const fayeClient = sourceConn.streaming.createClient([ | |
| new jsforce.StreamingExtension.Replay(channel, replayId), | |
| new jsforce.StreamingExtension.AuthFailure(() => process.exit(1)) | |
| ]); | |
| console.log('Client constructed!'); | |
| // Subscribe to the channel with a function to receive each message. | |
| const subscription = fayeClient.subscribe(channel, data => { | |
| let sObjName = data.payload.ChangeEventHeader.entityName; | |
| syncDataWithTargetOrg(data.payload, sObjName); | |
| }); | |
| console.log('Waiting for data...'); | |
| } | |
| async function setUpTargetAuth() | |
| { | |
| console.log('Re-authenticating target...'); | |
| let targetAuth = await myFuncs.getSFAuth(targetUsername, 'https://login.salesforce.com'); | |
| targetConn = new jsforce.Connection({ | |
| instanceUrl: targetAuth.instance_url, | |
| accessToken: targetAuth.access_token | |
| }); | |
| console.log('Target authenticated successfully!'); | |
| } | |
| function syncDataWithTargetOrg(payload, sObjName) | |
| { | |
| const CREATE_EVENT = 'CREATE'; | |
| const UPDATE_EVENT = 'UPDATE'; | |
| const DELETE_EVENT = 'DELETE'; | |
| const UPSERT_EVENT = 'UPSERT'; | |
| let changeType = payload.ChangeEventHeader.changeType; | |
| if (changeType === CREATE_EVENT) | |
| { | |
| handleCreateEvent(sObjName, payload); | |
| return; | |
| } | |
| if (changeType === UPDATE_EVENT || changeType === UPSERT_EVENT) | |
| { | |
| handleUpdateAndUpsertEvents(sObjName, payload); | |
| return; | |
| } | |
| if (changeType === DELETE_EVENT) | |
| { | |
| handleDeleteEvent(sObjName, payload); | |
| return; | |
| } | |
| console.error(`Change event type '${changeType}' is not supported.`); | |
| } | |
| // Could go away when CDC enrichment becomes GA | |
| function getExternalIdFromSourceConn(sObjName, sObjSourceId) | |
| { | |
| return new Promise((resolve, reject) => { | |
| let queryString = `SELECT ${externalIdFieldName} FROM ${sObjName} WHERE Id = '${sObjSourceId}' LIMIT 1`; | |
| sourceConn.queryAll(queryString, function (err, result) { | |
| if (err) { return console.error(err); } | |
| resolve(result.records[0][externalIdFieldName]); | |
| }); | |
| }) | |
| .catch(err => { | |
| console.log(err); | |
| }); | |
| } | |
| function createSObjectDataFromPayload(payload) | |
| { | |
| let sObjData = {}; | |
| for (const fieldName in payload) | |
| { | |
| if (shouldIgnore(fieldName)) { continue; } | |
| if (isAddress(fieldName)) | |
| { | |
| formatAddress(sObjData, payload, fieldName); | |
| continue; | |
| } | |
| if (isNameAndObject(fieldName, payload[fieldName])) | |
| { | |
| formatName(sObjData, payload, fieldName); | |
| continue; | |
| } | |
| sObjData[fieldName] = payload[fieldName]; | |
| } | |
| return sObjData; | |
| } | |
| function shouldIgnore(key) | |
| { | |
| const keysToIgnore = ["LastModifiedDate", "CreatedById", "CleanStatus", "CreatedDate", "OwnerId", "LastModifiedById", "ChangeEventHeader"]; | |
| return keysToIgnore.indexOf(key) > -1; | |
| } | |
| function isAddress(key) | |
| { | |
| return key.includes("Address"); | |
| } | |
| function formatAddress(sObjData, payload, fieldName) | |
| { | |
| let addressType = getAddressType(fieldName); | |
| let addressData = payload[fieldName]; | |
| for (const key in addressData) | |
| { | |
| sObjData[addressType + key] = addressData[key]; | |
| } | |
| } | |
| function isNameAndObject(key, value) | |
| { | |
| return key === "Name" && typeof value === "object"; | |
| } | |
| function formatName(sObjData, payload, fieldName) | |
| { | |
| let nameData = payload[fieldName]; | |
| for (const key in nameData) | |
| { | |
| sObjData[key] = nameData[key]; | |
| } | |
| } | |
| function getAddressType(fieldName) | |
| { | |
| return fieldName.replace("Address", ""); | |
| } | |
| function handleCreateEvent(sObjName, payload) | |
| { | |
| let sObjData = createSObjectDataFromPayload(payload); | |
| let callback = getDefaultCallback(sObjName, payload); | |
| createSObjectInTargetOrg(sObjName, sObjData, callback); | |
| } | |
| function handleUpdateAndUpsertEvents(sObjName, payload) | |
| { | |
| let sObjSourceId = getSourceIdFromPayload(payload); | |
| let sObjData = createSObjectDataFromPayload(payload); | |
| let callback = getDefaultCallback(sObjName, payload); | |
| let externalIdPromise = getExternalIdFromSourceConn(sObjName, sObjSourceId); | |
| externalIdPromise.then(externalId => { | |
| sObjData[externalIdFieldName] = externalId; | |
| upsertSObjectInTargetOrg(sObjName, sObjData, callback); | |
| }) | |
| .catch(err => { | |
| console.log(err); | |
| }); | |
| } | |
| function handleDeleteEvent(sObjName, payload) | |
| { | |
| let sObjSourceId = getSourceIdFromPayload(payload); | |
| let externalIdPromise = getExternalIdFromSourceConn(sObjName, sObjSourceId); | |
| let callback = getDefaultCallback(sObjName, payload); | |
| externalIdPromise.then(externalIdOfObjToDelete => { | |
| deleteSObjectInTargetOrg(sObjName, externalIdOfObjToDelete, callback); | |
| }) | |
| .catch(err => { | |
| console.log(err); | |
| }); | |
| } | |
| function createSObjectInTargetOrg(sObjName, sObjData, callback) | |
| { | |
| console.log(`Creating '${sObjName}' in Target Org...`); | |
| return targetConn.sobject(sObjName).create(sObjData, callback); | |
| } | |
| function upsertSObjectInTargetOrg(sObjName, sObjData, callback) | |
| { | |
| console.log(`Upserting '${sObjName}' in Target Org with an External Id of: ${sObjData[externalIdFieldName]}`); | |
| return targetConn.sobject(sObjName).upsert(sObjData, externalIdFieldName, callback); | |
| } | |
| function deleteSObjectInTargetOrg(sObjName, externalIdOfObjToDelete, callback) | |
| { | |
| console.log(`Deleting '${sObjName}' in Target Org with an External Id of: ${externalIdOfObjToDelete}`); | |
| return targetConn.request({ | |
| method: 'DELETE', | |
| url: `/services/data/v50.0/sobjects/${sObjName}/${externalIdFieldName}/${externalIdOfObjToDelete}`, | |
| headers: { | |
| 'content-type': 'application/json', | |
| }, | |
| }, callback); | |
| } | |
| function getSourceIdFromPayload(payload) | |
| { | |
| return payload.ChangeEventHeader.recordIds[0]; | |
| } | |
| function getDefaultCallback(sObjName, payload) | |
| { | |
| return async function (err, ret) { | |
| if (err || (ret && !ret.success)) { | |
| if (err.errorCode === 'INVALID_SESSION_ID' && !isFixingTargetAuth && retryArr.length === 0) | |
| { | |
| console.log('Target auth is expired!'); | |
| isFixingTargetAuth = true; | |
| let retryObj = {}; | |
| retryObj.sObjName = sObjName; | |
| retryObj.payload = payload; | |
| retryArr.push(retryObj); | |
| await setUpTargetAuth(); | |
| isFixingTargetAuth = false; | |
| retry(); | |
| } | |
| else if (err.errorCode === 'INVALID_SESSION_ID' && isFixingTargetAuth) | |
| { | |
| let retryObj = {}; | |
| retryObj.sObjName = sObjName; | |
| retryObj.payload = payload; | |
| retryArr.push(retryObj); | |
| } | |
| // return console.error(err, ret); | |
| } | |
| }; | |
| } | |
| function retry() | |
| { | |
| console.log('Retrying data sync...'); | |
| while (retryArr.length > 0) | |
| { | |
| let retryObj = retryArr.pop(); | |
| syncDataWithTargetOrg(retryObj.payload, retryObj.sObjName); | |
| } | |
| } | |
| start(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module.exports = { | |
| getSFAuth: function (username, audience) { | |
| const fs = require('fs'); | |
| const privateKey = process.env.PRIVATE_KEY; // Set as Heroku Environment variable | |
| const jwt = require("salesforce-jwt-bearer-token-flow"); // Install using `npm install salesforce-jwt-bearer-token-flow` | |
| return new Promise((resolve, reject) => { | |
| jwt.getToken({ | |
| iss: process.env.CLIENT_ID, // Set as Heroku Environment variable | |
| sub: username, | |
| aud: audience, | |
| privateKey: privateKey | |
| }, | |
| function (err, token) { | |
| resolve(token); | |
| } | |
| ); | |
| }); | |
| } | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| stream_consumer: node index.js |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Very nice 👍