Skip to content

Instantly share code, notes, and snippets.

@guweigang
Last active December 29, 2015 07:39
Show Gist options
  • Save guweigang/7637250 to your computer and use it in GitHub Desktop.
Save guweigang/7637250 to your computer and use it in GitHub Desktop.
<?php
/* binlog_task.php ---
*
* Filename: binlog_task.php
* Description:
* Author: Gu Weigang * Maintainer:
* Created: Mon Nov 25 14:22:33 2013 (+0800)
* Version:
* Last-Updated: Mon Nov 25 14:42:36 2013 (+0800)
* By: Gu Weigang
* Update #: 7
*
*/
/* Change Log:
*
*
*/
/* This program is part of "Baidu Darwin PHP Software"; you can redistribute it and/or
* modify it under the terms of the Baidu General Private License as
* published by Baidu Campus.
*
* You should have received a copy of the Baidu General Private License
* along with this program; see the file COPYING. If not, write to
* the Baidu Campus NO.10 Shangdi 10th Street Haidian District, Beijing The People's
* Republic of China, 100085.
*/
/* Code: */
// get binlog filename and position
$dsn = 'mysql://root:[email protected]:3306';
$dbname = 'binlog_test';
$slaveId = 1001;
$changelogPosPath = '/path/to/your/binlog-position/file';
$changelogPosFD = new \SplFileObject($changelogPosPath, "r");
$changelogPosCont = trim($changelogPosFD->fgets());
try {
$link = binlog_connect($dsn, $slaveId);
} catch(\Exception $e) {
error_log("Mysql binlog server connected failed");
if (isset($link) && $link) binlog_disconnect($link);
exit(1);
}
// set position
if(empty($changelogPosCont)) {
} else {
$changelogPosArr = json_decode($changelogPosCont, true);
binlog_set_position($link, $changelogPosArr['position'], $changelogPosArr['filename']);
}
try {
$quit = false;
$refBinlog = new \ReflectionExtension("mysqlbinlog");
$refBinlogConstants = $refBinlog->getConstants();
$binlogEventTypes = array_flip($refBinlogConstants);
$binlogEvent = new BinlogEvent();
$binlogParams = array(
'filename' => "",
'position' => "",
'controlSignal' => false,
);
// event loop for binlog events
while(!$quit) {
// event
$event = binlog_wait_for_next_event($link, $dbname);
if($event == null) {
binlog_disconnect($link);
break;
}
$msgs = array();
$eventTypeName = $binlogEventTypes[$event['type_code']];
// transform BINLOG_WRITE_ROWS_EVENT ---> OnWriteRowsEvent
$eventMethod = preg_replace("/_(\w)/e",
"ucfirst('$1')",
strtolower(str_replace("BINLOG_", "ON_", $eventTypeName)));
$return = $binlogEvent->{$eventMethod}($event, $msgs, $binlogParams);
switch($return) {
case BinlogEvent::IGNORE:
if(!empty($msgs)) {
getDI()->get('logger')->debug("Ignore event messages", $msgs);
if(isset($msgs['filename'])) $binlogParams['filename'] = $msgs['filename'];
if(isset($msgs['position'])) $binlogParams['position'] = $msgs['position'];
}
break;
case BinlogEvent::SUCCESS:
process($msgs);
break;
case BinlogEvent::L_EXIT:
$quit = true;
if(!empty($msgs)) {
getDI()->get('logger')->debug("Ignore event messages", $msgs);
if(isset($msgs['filename'])) $binlogParams['filename'] = $msgs['filename'];
if(isset($msgs['position'])) $binlogParams['position'] = $msgs['position'];
}
break;
case BinlogEvent::SIGNAL_QUIT:
$binlogParams['controlSignal'] = true;
}
if(!empty($msgs) && (isset($msgs['filename']) || isset($msgs['position']))) {
$changelogPosWRFD = new \SplFileObject($changelogPosFN, "w+");
$changelogPosWRFD->fwrite(json_encode(array(
'position' => $binlogParams['position'],
'filename' => $binlogParams['filename'],
)));
}
}
} catch(\Exception $e) {
error_log("Fatal exception: ". $e->getMessage());
binlog_disconnect($link);
exit(1);
}
function process($msgs)
{
}
/* binlog_task.php ends here */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment