Created
June 13, 2019 00:12
-
-
Save abcarroll/321fa94566b63e5d8e1b7b0219fcea57 to your computer and use it in GitHub Desktop.
Example on how to use a history table as a "what has changed" system for ETLs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// Or, to put it another way | |
function runLoader() { | |
$position = queryAndFetch("SELECT * FROM `chat_msg_history_positions` WHERE `positionName` = 'maxChatHistoryId'") | |
if($position === null) { | |
runFullLoad(); | |
} else { | |
runIncrementalLoad($position) | |
} | |
} | |
// So let's runLoader() now, and we'll get a full load. Full load should be something like this: | |
function runFullLoad() { | |
$newPosition = queryAndFetch("SELECT MAX(chat_msg_history_id) FROM chat_msg_history"); | |
$inputRows = queryAndFetch("SELECT * FROM `chat_msg`"); | |
insertIntoElastic($inputRows); | |
query("REPLACE INTO `chat_msg_history_positions` (`positionName`, `positionValue`) VALUES('maxChatHistoryId', $newPosition)"); | |
} | |
// And the BIG REVEAL! | |
function runIncrementalLoad() { | |
$newPosition = queryAndFetch("SELECT MAX(chat_msg_history_id) FROM chat_msg_history"); | |
$position = queryAndFetch("SELECT * FROM `chat_msg_history_positions` WHERE `positionName` = 'maxChatHistoryId'") | |
// we could also extract $newPosition here as we iterate, but it really doesn't matter anywa ultimately | |
$inputRows = queryAndFetch("SELECT * FROM `chat_msg_history` WHERE `chat_msg_history_id` > $position"); // normally we'd JOIN here with chat_msg to get the live data | |
insertUpdateAndDeleteRowsWithinElastic($input); // remember we have history_changed column that has to be taken into account here | |
query("REPLACE INTO `chat_msg_history_positions` (`positionName`, `positionValue`) VALUES('maxChatHistoryId', $newPosition)"); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE `chat_msg` ( | |
`msg_id` int(10) unsigned NOT NULL AUTO_INCREMENT, | |
`msg_sent_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
`msg_from` varchar(255) NOT NULL, | |
`msg_to` varchar(255) NOT NULL, | |
`msg_body` varchar(255) NOT NULL, | |
PRIMARY KEY (`msg_id`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |
CREATE TABLE `chat_msg_history` ( | |
`chat_msg_history_id` int(10) unsigned NOT NULL AUTO_INCREMENT, | |
`history_changed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
`history_change` enum('insert','update','delete') NOT NULL, | |
`msg_id` int(10) unsigned NOT NULL, | |
PRIMARY KEY (`chat_msg_history_id`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |
CREATE TRIGGER `chat_msg_history_insert` AFTER INSERT ON `chat_msg` | |
FOR EACH ROW INSERT INTO `chat_msg_history` (`history_change`, `msg_id`) VALUES('insert', NEW.`msg_id`); | |
CREATE TRIGGER `chat_msg_history_update` AFTER UPDATE ON `chat_msg` | |
FOR EACH ROW INSERT INTO `chat_msg_history` (`history_change`, `msg_id`) VALUES('update', NEW.`msg_id`); | |
CREATE TRIGGER `chat_msg_history_delete` AFTER DELETE ON `chat_msg` | |
FOR EACH ROW INSERT INTO `chat_msg_history` (`history_change`, `msg_id`) VALUES('delete', OLD.`msg_id`); | |
-- Now, I'm going to create this table but I wouldn't normally store this in a table... | |
CREATE TABLE `chat_msg_history_positions` ( | |
`positionName` varchar(255) NOT NULL, | |
`positionValue` int(10) unsigned DEFAULT NULL, | |
PRIMARY KEY (`positionName`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |
-- Now, let's insert some values into chat ... | |
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'This is our first message in our super awesome chat platform'); | |
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'And a second...'); | |
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Alex', 'Ben', 'Looks great!'); | |
-- Okay, now, let's set our position .. and assume the above is already loaded ... or, we already ran a "full load" right now (see psuedo code examples) | |
REPLACE INTO `chat_msg_history_positions` (`positionName`, `positionValue`) VALUES('maxChatHistoryId', (SELECT MAX(chat_msg_history_id) FROM chat_msg_history)); | |
-- Now, let's insert more stuff | |
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'Some more chat'); | |
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Ben', 'Alex', 'How about them nix?'); | |
INSERT INTO `chat_msg` (`msg_from`, `msg_to`, `msg_body`) VALUES('Alex', 'Ben', 'They suck!'); | |
-- Even delete! | |
DELETE FROM `chat_msg` WHERE msg_body LIKE '%our first message%'; | |
-- Now, let's fetch updates that have not been processed! | |
SELECT * FROM `chat_msg_history` WHERE `chat_msg_history_id` > (SELECT `positionValue` FROM `chat_msg_history_positions` WHERE `positionName` = 'maxChatHistoryId'); | |
-- +---------------------+---------------------+----------------+--------+ | |
-- | chat_msg_history_id | history_changed_at | history_change | msg_id | | |
-- +---------------------+---------------------+----------------+--------+ | |
-- | 4 | 2019-06-11 19:49:31 | insert | 10 | | |
-- | 5 | 2019-06-11 19:49:31 | insert | 11 | | |
-- | 6 | 2019-06-11 19:49:33 | insert | 12 | | |
-- | 7 | 2019-06-11 19:49:41 | delete | 7 | | |
-- +---------------------+---------------------+----------------+--------+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment