Skip to content

Instantly share code, notes, and snippets.

@abcarroll
Created June 13, 2019 00:12
Show Gist options
  • Save abcarroll/321fa94566b63e5d8e1b7b0219fcea57 to your computer and use it in GitHub Desktop.
Save abcarroll/321fa94566b63e5d8e1b7b0219fcea57 to your computer and use it in GitHub Desktop.
Example on how to use a history table as a "what has changed" system for ETLs
<?php
// Or, to put it another way
function runLoader() {
$position = queryAndFetch("SELECT * FROM `chat_msg_history_positions` WHERE `positionName` = 'maxChatHistoryId'")
if($position === null) {
runFullLoad();
} else {
runIncrementalLoad($position)
}
}
// So let's runLoader() now, and we'll get a full load. Full load should be something like this:
function runFullLoad() {
$newPosition = queryAndFetch("SELECT MAX(chat_msg_history_id) FROM chat_msg_history");
$inputRows = queryAndFetch("SELECT * FROM `chat_msg`");
insertIntoElastic($inputRows);
query("REPLACE INTO `chat_msg_history_positions` (`positionName`, `positionValue`) VALUES('maxChatHistoryId', $newPosition)");
}
// And the BIG REVEAL!
function runIncrementalLoad() {
$newPosition = queryAndFetch("SELECT MAX(chat_msg_history_id) FROM chat_msg_history");
$position = queryAndFetch("SELECT * FROM `chat_msg_history_positions` WHERE `positionName` = 'maxChatHistoryId'")
// we could also extract $newPosition here as we iterate, but it really doesn't matter anywa ultimately
$inputRows = queryAndFetch("SELECT * FROM `chat_msg_history` WHERE `chat_msg_history_id` > $position"); // normally we'd JOIN here with chat_msg to get the live data
insertUpdateAndDeleteRowsWithinElastic($input); // remember we have history_changed column that has to be taken into account here
query("REPLACE INTO `chat_msg_history_positions` (`positionName`, `positionValue`) VALUES('maxChatHistoryId', $newPosition)");
}
CREATE TABLE `chat_msg` (
`msg_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`msg_sent_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`msg_from` varchar(255) NOT NULL,
`msg_to` varchar(255) NOT NULL,
`msg_body` varchar(255) NOT NULL,
PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `chat_msg_history` (
`chat_msg_history_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`history_changed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`history_change` enum('insert','update','delete') NOT NULL,
`msg_id` int(10) unsigned NOT NULL,
PRIMARY KEY (`chat_msg_history_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TRIGGER `chat_msg_history_insert` AFTER INSERT ON `chat_msg`
FOR EACH ROW INSERT INTO `chat_msg_history` (`history_change`, `msg_id`) VALUES('insert', NEW.`msg_id`);
CREATE TRIGGER `chat_msg_history_update` AFTER UPDATE ON `chat_msg`
FOR EACH ROW INSERT INTO `chat_msg_history` (`history_change`, `msg_id`) VALUES('update', NEW.`msg_id`);
CREATE TRIGGER `chat_msg_history_delete` AFTER DELETE ON `chat_msg`
FOR EACH ROW INSERT INTO `chat_msg_history` (`history_change`, `msg_id`) VALUES('delete', OLD.`msg_id`);
-- Now, I'm going to create this table but I wouldn't normally store this in a table...
CREATE TABLE `chat_msg_history_positions` (
`positionName` varchar(255) NOT NULL,
`positionValue` int(10) unsigned DEFAULT NULL,
PRIMARY KEY (`positionName`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Now, let's insert some values into chat ...
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'This is our first message in our super awesome chat platform');
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'And a second...');
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Alex', 'Ben', 'Looks great!');
-- Okay, now, let's set our position .. and assume the above is already loaded ... or, we already ran a "full load" right now (see psuedo code examples)
REPLACE INTO `chat_msg_history_positions` (`positionName`, `positionValue`) VALUES('maxChatHistoryId', (SELECT MAX(chat_msg_history_id) FROM chat_msg_history));
-- Now, let's insert more stuff
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'Some more chat');
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'How about them nix?');
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Alex', 'Ben', 'They suck!');
-- Even delete!
DELETE FROM `chat_msg` WHERE msg_body LIKE '%our first message%';
-- Now, let's fetch updates that have not been processed!
SELECT * FROM `chat_msg_history` WHERE `chat_msg_history_id` > (SELECT `positionValue` FROM `chat_msg_history_positions` WHERE `positionName` = 'maxChatHistoryId');
-- +---------------------+---------------------+----------------+--------+
-- | chat_msg_history_id | history_changed_at | history_change | msg_id |
-- +---------------------+---------------------+----------------+--------+
-- | 4 | 2019-06-11 19:49:31 | insert | 10 |
-- | 5 | 2019-06-11 19:49:31 | insert | 11 |
-- | 6 | 2019-06-11 19:49:33 | insert | 12 |
-- | 7 | 2019-06-11 19:49:41 | delete | 7 |
-- +---------------------+---------------------+----------------+--------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment