Skip to content

Instantly share code, notes, and snippets.

@nguyenhiepvan
Forked from vuthaihoc/DbReplicate.php
Created April 13, 2023 05:20
Show Gist options
  • Save nguyenhiepvan/cf129fa29b77ccac4f5d83d5d39db737 to your computer and use it in GitHub Desktop.
Save nguyenhiepvan/cf129fa29b77ccac4f5d83d5d39db737 to your computer and use it in GitHub Desktop.
Laravel DB replicate
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Arr;
use Symfony\Component\Console\Output\ConsoleSectionOutput;
class DbReplicate extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'db:replicate
{--host= : another db host}
{--port= : another db port}
{--user= : another db user}
{--password= : another db password}
{--database= : another db name}
{--migrate_only : Migrate to another db only}
{--chunk=100 : Chunk size}
';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$another_db = [
'host' => $this->option('host'),
'port' => $this->option('port'),
'username' => $this->option('user'),
'password' => $this->option('password'),
'database' => $this->option('database'),
];
$chunk = $this->option('chunk');
if ($another_db['host'] . $another_db['port'] . $another_db['database'] == '') {
throw new \Exception('Duplicated with current db');
}
$another_db = array_filter($another_db);
$this->setupAnotherConnection($another_db);
// $count = \DB::connection('another_db')->table('users')->count();
$table_queues = $this->getTablesQueue();
if($this->confirm('Run migrate:fresh with another_db ?')){
\Artisan::call('migrate:fresh', ['--database' => 'another_db'], $this->getOutput());
}
foreach ($table_queues as $table){
$this->replicateTable($table, $chunk);
}
return Command::SUCCESS;
}
protected function setupAnotherConnection($options)
{
$config = config('database.connections.' . config('database.default'));
$before_hash = md5(json_encode($config));
$config = array_merge($config, $options);
if ($before_hash == md5(json_encode($config))) {
throw new \Exception('Duplicated with current db');
}
\Config::set('database.connections.another_db', $config);
}
protected function getTablesQueue(): array
{
$migrations = app('migration.repository')->getRan();
$migration_paths = app('migrator')->paths();
if (!in_array(database_path('migrations'), $migration_paths)) {
$migration_paths[] = database_path('migrations');
}
$migration_files = app('migrator')->getMigrationFiles($migration_paths);
$tables = ['migrations'];
foreach ($migrations as $migration) {
$file = $migration_files[$migration] ?? false;
if(!$file){
$this->warn("Not found " . $migration);
continue;
}
$file = file_get_contents($file);
if (preg_match_all("/Schema\:\:(create|table)\([\'\"]([^\'\"]+)[\'\"]/ui", $file, $matches)) {
foreach ($matches[2] as $table) {
if (!in_array($table, $tables)) {
$tables[] = $table;
}
}
}
}
$db_tables = \DB::getDoctrineSchemaManager()->listTableNames();;
foreach ($db_tables as $table) {
if (!in_array($table, $tables)) {
$tables[] = $table;
}
}
return $tables;
}
protected function replicateTable($table_name, $page_size = 50){
$this->info("* Replicating " . $table_name);
if($table_name == 'migrations'){
$this->warn("\tIgnored ");
return 0;
}
// Check table exist
if(!\DB::getDoctrineSchemaManager()->tablesExist($table_name)){
$this->warn("\tNo table with name " . $table_name . ' in source db');
return 0;
}
if(!\DB::connection('another_db')->getDoctrineSchemaManager()->tablesExist($table_name)){
$this->warn("\tNo table with name " . $table_name . ' in another db');
return 0;
}
// $this->info("\tTruncating " . $table_name);
// \DB::connection('another_db')->table($table_name)->truncate();
// dd(\DB::connection('another_db')->table($table_name)->first());
if(\DB::connection('another_db')->table($table_name)->first()){
$upsert = true;
$this->warn("\tUsing upsert mode");
}else{
$upsert = false;
}
$indexes = \DB::getDoctrineSchemaManager()->listTableIndexes($table_name);
if(count($indexes)){
$columns = reset($indexes)->getColumns();
}else{
$columns = \DB::getDoctrineSchemaManager()->listTableColumns($table_name);
$columns = array_keys($columns);
}
$this->info("\tPrimary : " . implode(",", $columns));
$this->info("\tCopying " . $table_name);
$total = 0;
$copied = 0;
/** @var ConsoleSectionOutput $section */
$section = $this->getOutput()->getOutput()->section();
$cursor = null;
do{
$data = \DB::table($table_name)->orderBy($columns[0])->cursorPaginate($page_size, ['*'], 'cursor', $cursor);
$cursor = $data->nextCursor();
\DB::connection('another_db')->transaction(function ($db) use ($data, $table_name, $columns, $upsert, &$copied, &$total){
foreach ($data as $item){
$item = (array)$item;
if($upsert) {
$db->table($table_name)->upsert($item, [Arr::only($item, $columns)]);
}else{
$db->table($table_name)->insert($item);
}
$copied++;
}
$total += count($data->items());
});
$section->overwrite("\t\tCopied " . $copied . "/" . $total);
}while(count($data->items()) && $cursor);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment