Created
December 15, 2011 18:14
-
-
Save jmikola/1482158 to your computer and use it in GitHub Desktop.
Migration script for OrnicarMessageBundle
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Ornicar\MessageBundle\Command; | |
use Doctrine\Common\Persistence\ManagerRegistry; | |
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; | |
use Symfony\Component\Console\Input\InputArgument; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
/** | |
* Migration script for thread and message documents | |
* | |
* @see https://github.com/ornicar/OrnicarMessageBundle | |
*/ | |
class MongoDBMigrateMetadataCommand extends ContainerAwareCommand | |
{ | |
/** | |
* @var \MongoCollection | |
*/ | |
private $messageCollection; | |
/** | |
* @var \MongoCollection | |
*/ | |
private $threadCollection; | |
/** | |
* @var \MongoCollection | |
*/ | |
private $participantCollection; | |
/** | |
* @var array | |
*/ | |
private $updateOptions; | |
/** | |
* @var \Closure | |
*/ | |
private $printStatusCallback; | |
/** | |
* @see Symfony\Component\Console\Command\Command::configure() | |
*/ | |
protected function configure() | |
{ | |
$this | |
->setName('ornicar:message:mongodb:migrate:metadata') | |
->setDescription('Migrates document hash fields to embedded metadata') | |
->addArgument('participantClass', InputArgument::REQUIRED, 'Participant class') | |
->addOption('safe', null, InputOption::VALUE_OPTIONAL, 'Mongo update option', false) | |
->addOption('fsync', null, InputOption::VALUE_OPTIONAL, 'Mongo update option', false) | |
; | |
} | |
/** | |
* @see Symfony\Bundle\FrameworkBundle\Command\Command::initialize() | |
*/ | |
protected function initialize(InputInterface $input, OutputInterface $output) | |
{ | |
parent::initialize($input, $output); | |
$registry = $this->getContainer()->get('doctrine.odm.mongodb'); | |
$this->messageCollection = $this->getMongoCollectionForClass($registry, $this->getContainer()->getParameter('ornicar_message.message_class')); | |
$this->threadCollection = $this->getMongoCollectionForClass($registry, $this->getContainer()->getParameter('ornicar_message.thread_class')); | |
$this->participantCollection = $this->getMongoCollectionForClass($registry, $input->getArgument('participantClass')); | |
$this->updateOptions = array( | |
'multiple' => false, | |
'safe' => $input->getOption('safe'), | |
'fsync' => $input->getOption('fsync'), | |
); | |
$this->printStatusCallback = function() {}; | |
register_tick_function(array($this, 'printStatus')); | |
} | |
/** | |
* @see Symfony\Component\Console\Command\Command::execute() | |
*/ | |
protected function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$this->migrateMessages($output); | |
$this->migrateThreads($output); | |
$size = memory_get_peak_usage(true); | |
$unit = array('b', 'k', 'm', 'g', 't', 'p'); | |
$output->writeln(sprintf("Peak Memory Usage: <comment>%s</comment>", round($size / pow(1024, ($i = floor(log($size, 1024)))), 2).$unit[$i])); | |
} | |
/** | |
* Migrate message documents | |
* | |
* @param OutputInterface $output | |
*/ | |
private function migrateMessages(OutputInterface $output) | |
{ | |
$cursor = $this->messageCollection->find( | |
array('metadata' => array('$exists' => false)), | |
array('isReadByParticipant' => 1) | |
); | |
$numProcessed = 0; | |
if (!$numTotal = $cursor->count()) { | |
$output->writeln('There are no message documents to migrate.'); | |
return; | |
} | |
$this->printStatusCallback = function() use ($output, &$numProcessed, $numTotal) { | |
$output->write(sprintf("Processed: <info>%d</info> / Complete: <info>%d%%</info>\r", $numProcessed, round(100 * ($numProcessed / $numTotal)))); | |
}; | |
declare(ticks=100) { | |
foreach ($cursor as $message) { | |
$this->messageCollection->update( | |
array('_id' => $message['_id']), | |
array('$set' => array('metadata' => $this->createMessageMetadata($message))), | |
$this->updateOptions | |
); | |
++$numProcessed; | |
} | |
} | |
$output->write(str_repeat(' ', 28 + ceil(log10($numProcessed))) . "\r"); | |
$output->writeln(sprintf('Migrated <info>%d</info> message documents.', $numProcessed)); | |
} | |
/** | |
* Migrate thread documents | |
* | |
* @param OutputInterface $output | |
*/ | |
private function migrateThreads(OutputInterface $output) | |
{ | |
$cursor = $this->threadCollection->find( | |
array('metadata' => array('$exists' => false)), | |
array( | |
'datesOfLastMessageWrittenByOtherParticipant' => 1, | |
'datesOfLastMessageWrittenByParticipant' => 1, | |
'isDeletedByParticipant' => 1, | |
'messages' => 1, | |
) | |
); | |
$numProcessed = 0; | |
if (!$numTotal = $cursor->count()) { | |
$output->writeln('There are no thread documents to migrate.'); | |
return; | |
} | |
$this->printStatusCallback = function() use ($output, &$numProcessed, $numTotal) { | |
$output->write(sprintf("Processed: <info>%d</info> / Complete: <info>%d%%</info>\r", $numProcessed, round(100 * ($numProcessed / $numTotal)))); | |
}; | |
declare(ticks=100) { | |
foreach ($cursor as $thread) { | |
$this->threadCollection->update( | |
array('_id' => $thread['_id']), | |
array('$set' => array( | |
'metadata' => $this->createThreadMetadata($thread), | |
'lastMessageDate' => $this->getLastMessageDate($thread['messages']), | |
)), | |
$this->updateOptions | |
); | |
++$numProcessed; | |
} | |
} | |
$output->write(str_repeat(' ', 28 + ceil(log10($numProcessed))) . "\r"); | |
$output->writeln(sprintf('Migrated <info>%d</info> thread documents.', $numProcessed)); | |
} | |
/** | |
* Create message metadata array | |
* | |
* By default, Mongo will not include "$db" when creating the participant | |
* reference. We'll add that manually to be consistent with Doctrine. | |
* | |
* @param array $message | |
* @return array | |
*/ | |
private function createMessageMetadata(array $message) | |
{ | |
$metadata = array(); | |
foreach ($message['isReadByParticipant'] as $participantId => $isRead) { | |
$metadata[] = array( | |
'isRead' => $isRead, | |
'participant' => $this->participantCollection->createDBRef(array('_id' => new \MongoId($participantId))) + array('$db' => (string) $this->participantCollection->db), | |
); | |
} | |
return $metadata; | |
} | |
/** | |
* Create thread metadata array | |
* | |
* By default, Mongo will not include "$db" when creating the participant | |
* reference. We'll add that manually to be consistent with Doctrine. | |
* | |
* @param array $thread | |
* @return array | |
*/ | |
private function createThreadMetadata(array $thread) | |
{ | |
$metadata = array(); | |
$participantIds = array_keys($thread['datesOfLastMessageWrittenByOtherParticipant'] + $thread['datesOfLastMessageWrittenByParticipant'] + $thread['isDeletedByParticipant']); | |
foreach ($participantIds as $participantId) { | |
$meta = array( | |
'isDeleted' => false, | |
'participant' => $this->participantCollection->createDBRef(array('_id' => new \MongoId($participantId))) + array('$db' => (string) $this->participantCollection->db), | |
); | |
if (isset($thread['isDeletedByParticipant'][$participantId])) { | |
$meta['isDeleted'] = $thread['isDeletedByParticipant'][$participantId]; | |
} | |
if (isset($thread['datesOfLastMessageWrittenByOtherParticipant'][$participantId])) { | |
$meta['lastMessageDate'] = new \MongoDate($thread['datesOfLastMessageWrittenByOtherParticipant'][$participantId]); | |
} | |
if (isset($thread['datesOfLastMessageWrittenByParticipant'][$participantId])) { | |
$meta['lastParticipantMessageDate'] = new \MongoDate($thread['datesOfLastMessageWrittenByParticipant'][$participantId]); | |
} | |
$metadata[] = $meta; | |
} | |
return $metadata; | |
} | |
/** | |
* Get the last message date for the given list of message references | |
* | |
* @param array $messageRefs | |
* @return \MongoDate | |
*/ | |
private function getLastMessageDate(array $messageRefs) | |
{ | |
$lastMessageRef = end($messageRefs); | |
if (false === $lastMessageRef) { | |
return null; | |
} | |
$lastMessage = $this->messageCollection->findOne( | |
array('_id' => $lastMessageRef['$id']), | |
array('createdAt' => 1) | |
); | |
return isset($lastMessage['createdAt']) ? $lastMessage['createdAt'] : null; | |
} | |
/** | |
* Get the MongoCollection for the given class | |
* | |
* @param ManagerRegistry $registry | |
* @param string $class | |
* @return \MongoCollection | |
* @throws \RuntimeException if the class has no DocumentManager | |
*/ | |
private function getMongoCollectionForClass(ManagerRegistry $registry, $class) | |
{ | |
if (!$dm = $registry->getManagerForClass($class)) { | |
throw new \RuntimeException(sprintf('There is no DocumentManager for class "%s"', $class)); | |
} | |
return $dm->getDocumentCollection($class)->getMongoCollection(); | |
} | |
/** | |
* Invokes the print status callback | |
* | |
* Since unregister_tick_function() does not support anonymous functions, it | |
* is easier to register one method (this) and invoke a dynamic callback. | |
*/ | |
public function printStatus() | |
{ | |
call_user_func($this->printStatusCallback); | |
} | |
} |
Author
jmikola
commented
Dec 15, 2011
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment