Skip to content

Instantly share code, notes, and snippets.

@jmikola
Created December 15, 2011 18:14
Show Gist options
  • Save jmikola/1482158 to your computer and use it in GitHub Desktop.
Save jmikola/1482158 to your computer and use it in GitHub Desktop.
Migration script for OrnicarMessageBundle
<?php
namespace Ornicar\MessageBundle\Command;
use Doctrine\Common\Persistence\ManagerRegistry;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Migration script for thread and message documents
*
* @see https://github.com/ornicar/OrnicarMessageBundle
*/
class MongoDBMigrateMetadataCommand extends ContainerAwareCommand
{
/**
* @var \MongoCollection
*/
private $messageCollection;
/**
* @var \MongoCollection
*/
private $threadCollection;
/**
* @var \MongoCollection
*/
private $participantCollection;
/**
* @var array
*/
private $updateOptions;
/**
* @var \Closure
*/
private $printStatusCallback;
/**
* @see Symfony\Component\Console\Command\Command::configure()
*/
protected function configure()
{
$this
->setName('ornicar:message:mongodb:migrate:metadata')
->setDescription('Migrates document hash fields to embedded metadata')
->addArgument('participantClass', InputArgument::REQUIRED, 'Participant class')
->addOption('safe', null, InputOption::VALUE_OPTIONAL, 'Mongo update option', false)
->addOption('fsync', null, InputOption::VALUE_OPTIONAL, 'Mongo update option', false)
;
}
/**
* @see Symfony\Bundle\FrameworkBundle\Command\Command::initialize()
*/
protected function initialize(InputInterface $input, OutputInterface $output)
{
parent::initialize($input, $output);
$registry = $this->getContainer()->get('doctrine.odm.mongodb');
$this->messageCollection = $this->getMongoCollectionForClass($registry, $this->getContainer()->getParameter('ornicar_message.message_class'));
$this->threadCollection = $this->getMongoCollectionForClass($registry, $this->getContainer()->getParameter('ornicar_message.thread_class'));
$this->participantCollection = $this->getMongoCollectionForClass($registry, $input->getArgument('participantClass'));
$this->updateOptions = array(
'multiple' => false,
'safe' => $input->getOption('safe'),
'fsync' => $input->getOption('fsync'),
);
$this->printStatusCallback = function() {};
register_tick_function(array($this, 'printStatus'));
}
/**
* @see Symfony\Component\Console\Command\Command::execute()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->migrateMessages($output);
$this->migrateThreads($output);
$size = memory_get_peak_usage(true);
$unit = array('b', 'k', 'm', 'g', 't', 'p');
$output->writeln(sprintf("Peak Memory Usage: <comment>%s</comment>", round($size / pow(1024, ($i = floor(log($size, 1024)))), 2).$unit[$i]));
}
/**
* Migrate message documents
*
* @param OutputInterface $output
*/
private function migrateMessages(OutputInterface $output)
{
$cursor = $this->messageCollection->find(
array('metadata' => array('$exists' => false)),
array('isReadByParticipant' => 1)
);
$numProcessed = 0;
if (!$numTotal = $cursor->count()) {
$output->writeln('There are no message documents to migrate.');
return;
}
$this->printStatusCallback = function() use ($output, &$numProcessed, $numTotal) {
$output->write(sprintf("Processed: <info>%d</info> / Complete: <info>%d%%</info>\r", $numProcessed, round(100 * ($numProcessed / $numTotal))));
};
declare(ticks=100) {
foreach ($cursor as $message) {
$this->messageCollection->update(
array('_id' => $message['_id']),
array('$set' => array('metadata' => $this->createMessageMetadata($message))),
$this->updateOptions
);
++$numProcessed;
}
}
$output->write(str_repeat(' ', 28 + ceil(log10($numProcessed))) . "\r");
$output->writeln(sprintf('Migrated <info>%d</info> message documents.', $numProcessed));
}
/**
* Migrate thread documents
*
* @param OutputInterface $output
*/
private function migrateThreads(OutputInterface $output)
{
$cursor = $this->threadCollection->find(
array('metadata' => array('$exists' => false)),
array(
'datesOfLastMessageWrittenByOtherParticipant' => 1,
'datesOfLastMessageWrittenByParticipant' => 1,
'isDeletedByParticipant' => 1,
'messages' => 1,
)
);
$numProcessed = 0;
if (!$numTotal = $cursor->count()) {
$output->writeln('There are no thread documents to migrate.');
return;
}
$this->printStatusCallback = function() use ($output, &$numProcessed, $numTotal) {
$output->write(sprintf("Processed: <info>%d</info> / Complete: <info>%d%%</info>\r", $numProcessed, round(100 * ($numProcessed / $numTotal))));
};
declare(ticks=100) {
foreach ($cursor as $thread) {
$this->threadCollection->update(
array('_id' => $thread['_id']),
array('$set' => array(
'metadata' => $this->createThreadMetadata($thread),
'lastMessageDate' => $this->getLastMessageDate($thread['messages']),
)),
$this->updateOptions
);
++$numProcessed;
}
}
$output->write(str_repeat(' ', 28 + ceil(log10($numProcessed))) . "\r");
$output->writeln(sprintf('Migrated <info>%d</info> thread documents.', $numProcessed));
}
/**
* Create message metadata array
*
* By default, Mongo will not include "$db" when creating the participant
* reference. We'll add that manually to be consistent with Doctrine.
*
* @param array $message
* @return array
*/
private function createMessageMetadata(array $message)
{
$metadata = array();
foreach ($message['isReadByParticipant'] as $participantId => $isRead) {
$metadata[] = array(
'isRead' => $isRead,
'participant' => $this->participantCollection->createDBRef(array('_id' => new \MongoId($participantId))) + array('$db' => (string) $this->participantCollection->db),
);
}
return $metadata;
}
/**
* Create thread metadata array
*
* By default, Mongo will not include "$db" when creating the participant
* reference. We'll add that manually to be consistent with Doctrine.
*
* @param array $thread
* @return array
*/
private function createThreadMetadata(array $thread)
{
$metadata = array();
$participantIds = array_keys($thread['datesOfLastMessageWrittenByOtherParticipant'] + $thread['datesOfLastMessageWrittenByParticipant'] + $thread['isDeletedByParticipant']);
foreach ($participantIds as $participantId) {
$meta = array(
'isDeleted' => false,
'participant' => $this->participantCollection->createDBRef(array('_id' => new \MongoId($participantId))) + array('$db' => (string) $this->participantCollection->db),
);
if (isset($thread['isDeletedByParticipant'][$participantId])) {
$meta['isDeleted'] = $thread['isDeletedByParticipant'][$participantId];
}
if (isset($thread['datesOfLastMessageWrittenByOtherParticipant'][$participantId])) {
$meta['lastMessageDate'] = new \MongoDate($thread['datesOfLastMessageWrittenByOtherParticipant'][$participantId]);
}
if (isset($thread['datesOfLastMessageWrittenByParticipant'][$participantId])) {
$meta['lastParticipantMessageDate'] = new \MongoDate($thread['datesOfLastMessageWrittenByParticipant'][$participantId]);
}
$metadata[] = $meta;
}
return $metadata;
}
/**
* Get the last message date for the given list of message references
*
* @param array $messageRefs
* @return \MongoDate
*/
private function getLastMessageDate(array $messageRefs)
{
$lastMessageRef = end($messageRefs);
if (false === $lastMessageRef) {
return null;
}
$lastMessage = $this->messageCollection->findOne(
array('_id' => $lastMessageRef['$id']),
array('createdAt' => 1)
);
return isset($lastMessage['createdAt']) ? $lastMessage['createdAt'] : null;
}
/**
* Get the MongoCollection for the given class
*
* @param ManagerRegistry $registry
* @param string $class
* @return \MongoCollection
* @throws \RuntimeException if the class has no DocumentManager
*/
private function getMongoCollectionForClass(ManagerRegistry $registry, $class)
{
if (!$dm = $registry->getManagerForClass($class)) {
throw new \RuntimeException(sprintf('There is no DocumentManager for class "%s"', $class));
}
return $dm->getDocumentCollection($class)->getMongoCollection();
}
/**
* Invokes the print status callback
*
* Since unregister_tick_function() does not support anonymous functions, it
* is easier to register one method (this) and invoke a dynamic callback.
*/
public function printStatus()
{
call_user_func($this->printStatusCallback);
}
}
@jmikola
Copy link
Author

jmikola commented Dec 15, 2011

$ app/console ornicar:message:mongodb:migrate:metadata "Application\UserBundle\Document\User"
Migrated 33568 message documents.
Migrated 30102 thread documents. 
Peak Memory Usage: 32.5m
$ mongostat
connected to: 127.0.0.1
insert  query update delete getmore command flushes mapped  vsize    res faults locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0      0       0       1       0   944m  2.16g   222m      1        0          0       0|0     0|0    62b     1k    21   13:28:13 
     0      0      0      0       0       1       0   944m  2.16g   222m      0        0          0       0|0     0|0    62b     1k    21   13:28:14 
     0      1    609      0       1       2       0   944m  2.16g   241m      0      3.3          0       0|0     0|1   187k     3m    22   13:28:15 
     0      0  18434      0       0       1       0   944m  2.16g   224m      0     89.6          0       0|0     0|1     5m     1k    22   13:28:16 
     0      0  10735      0       0       1       0   944m  2.16g   221m      0     92.5          0       0|0     0|1     3m     1k    22   13:28:17 
     0   2836   6625      0       1       2       0   944m  2.16g   223m      0     52.6          0       0|0     0|1     2m     4m    22   13:28:18 
     0   4688   4688      0       0       1       0   944m  2.16g   233m      0     44.2          0       0|1     0|1     2m   362k    22   13:28:19 
     0   4285   4285      0       0       1       0   944m  2.16g   229m      0     45.6          0       0|1     0|1     2m   331k    22   13:28:20 
     0   3853   3852      0       1       1       1   944m  2.16g   224m      0     28.8          0       0|0     0|0     1m     4m    22   13:28:21 
     0   3664   3664      0       0       1       0   944m  2.16g   224m      0     46.1          0       0|0     0|1     1m   283k    22   13:28:22 
     0   2690   2691      0       0       1       0   944m  2.16g   225m      0     48.4          0       0|0     0|0     1m   208k    22   13:28:23 
     0   4329   4329      0       1       1       0   944m  2.16g   225m      0     45.4          0       0|0     0|0     2m     2m    22   13:28:24 
     0   2503   2503      0       0       1       0   944m  2.16g   223m      0     47.6          0       0|0     0|0     1m   194k    22   13:28:25 
     0   1255   1255      0       0       1       0   944m  2.16g   229m      0     14.8          0       0|0     0|0   636k    97k    21   13:28:26 
     0      0      0      0       0       1       0   944m  2.16g   229m      0        0          0       0|0     0|0    62b     1k    21   13:28:27

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment