Skip to content

Instantly share code, notes, and snippets.

@jkyamog
Created October 15, 2013 21:47
Show Gist options
  • Save jkyamog/6999185 to your computer and use it in GitHub Desktop.
Save jkyamog/6999185 to your computer and use it in GitHub Desktop.
Batch updates on reactive mongo sometimes fails.
val batchSize = 100 // for some reason we need to throttle down the import, putting everything in sometimes causes a future already completed
Async {
fetch("/instrument").map { instruments =>
Logger.debug(s"importing ${instruments.size} instruments")
instruments.grouped(batchSize).foreach { batch =>
res.batchInsert(Enumerator.enumerate(batch))
}
Accepted
}
}
// above seems to be working, but if batchSize is big like more than 500 some batches failes to insert more often
// even wait for future by doing a foldLeft and map like below. if batchSize is about 100 it seems to be ok
val batchSize = 100 // for some reason we need to throttle down the import, putting everything in sometimes causes a future already completed
Async {
fetch("/instrument").flatMap { instruments =>
Logger.debug(s"importing ${instruments.size} instruments")
instruments.grouped(batchSize).foldLeft(Future{Accepted}) { (acc, batch) =>
res.batchInsert(Enumerator.enumerate(batch)).map{ _ =>
Accepted
}
}
}
}
// The straight forward code fails, unless the instruments are less than 500.
Async {
fetch("/instrument").flatMap { instruments =>
Logger.debug(s"importing ${instruments.size} instruments")
res.batchInsert(Enumerator.enumerate(instruments)).map{ _ =>
Accepted
}
}
}
// stack trace below
java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:58)
at scala.concurrent.Promise$class.failure(Promise.scala:107)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:58)
at scala.concurrent.Future$$anonfun$flatMap$1.liftedTree3$1(Future.scala:283)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:277)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:274)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:29)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment