Skip to content

Instantly share code, notes, and snippets.

@nicholasglesmann
Last active November 20, 2020 15:08
Show Gist options
  • Save nicholasglesmann/c09c7dc11bd23c787ebbf1df179a9da0 to your computer and use it in GitHub Desktop.
Save nicholasglesmann/c09c7dc11bd23c787ebbf1df179a9da0 to your computer and use it in GitHub Desktop.
Using Salesforce Change Data Capture to listen for CRUD Account/Contact Record events in one Org (sourceConn) and duplicate them in another Org (targetConn)
const jsforce = require('jsforce');
const myFuncs = require('./myFuncs');
const sourceUsername = '[email protected]';
const targetUsername = '[email protected]';
const externalIdFieldName = 'External_Id__c';
// The Salesforce streaming topic and position to start from.
const channel = "/data/ChangeEvents";
const replayId = -1; // receive only new messages without replay
let sourceConn;
let targetConn;
let isFixingTargetAuth = false;
let retryArr = [];
async function start()
{
let sourceAuth = await myFuncs.getSFAuth(sourceUsername, 'https://login.salesforce.com');
sourceConn = new jsforce.Connection({
instanceUrl: sourceAuth.instance_url,
accessToken: sourceAuth.access_token
});
await setUpTargetAuth();
// Construct a streaming client.
console.log('Constructing client...');
const fayeClient = sourceConn.streaming.createClient([
new jsforce.StreamingExtension.Replay(channel, replayId),
new jsforce.StreamingExtension.AuthFailure(() => process.exit(1))
]);
console.log('Client constructed!');
// Subscribe to the channel with a function to receive each message.
const subscription = fayeClient.subscribe(channel, data => {
let sObjName = data.payload.ChangeEventHeader.entityName;
syncDataWithTargetOrg(data.payload, sObjName);
});
console.log('Waiting for data...');
}
async function setUpTargetAuth()
{
console.log('Re-authenticating target...');
let targetAuth = await myFuncs.getSFAuth(targetUsername, 'https://login.salesforce.com');
targetConn = new jsforce.Connection({
instanceUrl: targetAuth.instance_url,
accessToken: targetAuth.access_token
});
console.log('Target authenticated successfully!');
}
function syncDataWithTargetOrg(payload, sObjName)
{
const CREATE_EVENT = 'CREATE';
const UPDATE_EVENT = 'UPDATE';
const DELETE_EVENT = 'DELETE';
const UPSERT_EVENT = 'UPSERT';
let changeType = payload.ChangeEventHeader.changeType;
if (changeType === CREATE_EVENT)
{
handleCreateEvent(sObjName, payload);
return;
}
if (changeType === UPDATE_EVENT || changeType === UPSERT_EVENT)
{
handleUpdateAndUpsertEvents(sObjName, payload);
return;
}
if (changeType === DELETE_EVENT)
{
handleDeleteEvent(sObjName, payload);
return;
}
console.error(`Change event type '${changeType}' is not supported.`);
}
// Could go away when CDC enrichment becomes GA
function getExternalIdFromSourceConn(sObjName, sObjSourceId)
{
return new Promise((resolve, reject) => {
let queryString = `SELECT ${externalIdFieldName} FROM ${sObjName} WHERE Id = '${sObjSourceId}' LIMIT 1`;
sourceConn.queryAll(queryString, function (err, result) {
if (err) { return console.error(err); }
resolve(result.records[0][externalIdFieldName]);
});
})
.catch(err => {
console.log(err);
});
}
function createSObjectDataFromPayload(payload)
{
let sObjData = {};
for (const fieldName in payload)
{
if (shouldIgnore(fieldName)) { continue; }
if (isAddress(fieldName))
{
formatAddress(sObjData, payload, fieldName);
continue;
}
if (isNameAndObject(fieldName, payload[fieldName]))
{
formatName(sObjData, payload, fieldName);
continue;
}
sObjData[fieldName] = payload[fieldName];
}
return sObjData;
}
function shouldIgnore(key)
{
const keysToIgnore = ["LastModifiedDate", "CreatedById", "CleanStatus", "CreatedDate", "OwnerId", "LastModifiedById", "ChangeEventHeader"];
return keysToIgnore.indexOf(key) > -1;
}
function isAddress(key)
{
return key.includes("Address");
}
function formatAddress(sObjData, payload, fieldName)
{
let addressType = getAddressType(fieldName);
let addressData = payload[fieldName];
for (const key in addressData)
{
sObjData[addressType + key] = addressData[key];
}
}
function isNameAndObject(key, value)
{
return key === "Name" && typeof value === "object";
}
function formatName(sObjData, payload, fieldName)
{
let nameData = payload[fieldName];
for (const key in nameData)
{
sObjData[key] = nameData[key];
}
}
function getAddressType(fieldName)
{
return fieldName.replace("Address", "");
}
function handleCreateEvent(sObjName, payload)
{
let sObjData = createSObjectDataFromPayload(payload);
let callback = getDefaultCallback(sObjName, payload);
createSObjectInTargetOrg(sObjName, sObjData, callback);
}
function handleUpdateAndUpsertEvents(sObjName, payload)
{
let sObjSourceId = getSourceIdFromPayload(payload);
let sObjData = createSObjectDataFromPayload(payload);
let callback = getDefaultCallback(sObjName, payload);
let externalIdPromise = getExternalIdFromSourceConn(sObjName, sObjSourceId);
externalIdPromise.then(externalId => {
sObjData[externalIdFieldName] = externalId;
upsertSObjectInTargetOrg(sObjName, sObjData, callback);
})
.catch(err => {
console.log(err);
});
}
function handleDeleteEvent(sObjName, payload)
{
let sObjSourceId = getSourceIdFromPayload(payload);
let externalIdPromise = getExternalIdFromSourceConn(sObjName, sObjSourceId);
let callback = getDefaultCallback(sObjName, payload);
externalIdPromise.then(externalIdOfObjToDelete => {
deleteSObjectInTargetOrg(sObjName, externalIdOfObjToDelete, callback);
})
.catch(err => {
console.log(err);
});
}
function createSObjectInTargetOrg(sObjName, sObjData, callback)
{
console.log(`Creating '${sObjName}' in Target Org...`);
return targetConn.sobject(sObjName).create(sObjData, callback);
}
function upsertSObjectInTargetOrg(sObjName, sObjData, callback)
{
console.log(`Upserting '${sObjName}' in Target Org with an External Id of: ${sObjData[externalIdFieldName]}`);
return targetConn.sobject(sObjName).upsert(sObjData, externalIdFieldName, callback);
}
function deleteSObjectInTargetOrg(sObjName, externalIdOfObjToDelete, callback)
{
console.log(`Deleting '${sObjName}' in Target Org with an External Id of: ${externalIdOfObjToDelete}`);
return targetConn.request({
method: 'DELETE',
url: `/services/data/v50.0/sobjects/${sObjName}/${externalIdFieldName}/${externalIdOfObjToDelete}`,
headers: {
'content-type': 'application/json',
},
}, callback);
}
function getSourceIdFromPayload(payload)
{
return payload.ChangeEventHeader.recordIds[0];
}
function getDefaultCallback(sObjName, payload)
{
return async function (err, ret) {
if (err || (ret && !ret.success)) {
if (err.errorCode === 'INVALID_SESSION_ID' && !isFixingTargetAuth && retryArr.length === 0)
{
console.log('Target auth is expired!');
isFixingTargetAuth = true;
let retryObj = {};
retryObj.sObjName = sObjName;
retryObj.payload = payload;
retryArr.push(retryObj);
await setUpTargetAuth();
isFixingTargetAuth = false;
retry();
}
else if (err.errorCode === 'INVALID_SESSION_ID' && isFixingTargetAuth)
{
let retryObj = {};
retryObj.sObjName = sObjName;
retryObj.payload = payload;
retryArr.push(retryObj);
}
// return console.error(err, ret);
}
};
}
function retry()
{
console.log('Retrying data sync...');
while (retryArr.length > 0)
{
let retryObj = retryArr.pop();
syncDataWithTargetOrg(retryObj.payload, retryObj.sObjName);
}
}
start();
module.exports = {
getSFAuth: function (username, audience) {
const fs = require('fs');
const privateKey = process.env.PRIVATE_KEY; // Set as Heroku Environment variable
const jwt = require("salesforce-jwt-bearer-token-flow"); // Install using `npm install salesforce-jwt-bearer-token-flow`
return new Promise((resolve, reject) => {
jwt.getToken({
iss: process.env.CLIENT_ID, // Set as Heroku Environment variable
sub: username,
aud: audience,
privateKey: privateKey
},
function (err, token) {
resolve(token);
}
);
});
}
};
stream_consumer: node index.js
@banderson5144
Copy link

Very nice 👍

@banderson5144
Copy link

@msrivastav13 Updated to use JWT

@msrivastav13
Copy link

@branderson this is very cool!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment