Skip to content

Instantly share code, notes, and snippets.

@alexy
Created March 27, 2012 20:07
Show Gist options
  • Save alexy/2219800 to your computer and use it in GitHub Desktop.
Save alexy/2219800 to your computer and use it in GitHub Desktop.
Mongo Bulk Load from Casbah
import com.mongodb.casbah._
import org.joda.time.DateTime
import collection.mutable.ArrayBuffer
//...
def mongoGobble(co: Conf, chunk: List[Quad], globalIndex: Int) = {
val gobbles = chunk.zipWithIndex map { case ((x,y,r,ri),i) =>
val n = globalIndex + i
val id = co.batch match {
case Some(b) => "%d:%d:%d".format(r,b,n)
case _ => "%d:%d".format(r,n)
}
Map("_id" -> id,
"x" -> x.toString, "y" -> y.toString,
"r" -> r.toString, "i" -> ri,
"ca" -> DateTime.now.getMillis.toDouble,
"t" -> 1)
}
co.mc.insert(gobbles:_*)
// we now use co.loadProgress in BufferLoad.add
// which should be a multiple of co.loadChunkSize
// Console.err.print(".")
}
class BufferLoad(co: Conf) {
var globalCount = 0
var bufCaret = 0
val buffer = new ArrayBuffer[Quad](initialSize=co.loadChunkSize)
def add(elem: Quad): Boolean = {
buffer += elem
bufCaret = (bufCaret + 1) % co.loadChunkSize
if (bufCaret == 0) {
mongoGobble(co,buffer.toList,globalCount)
buffer.clear()
globalCount += co.loadChunkSize
if (globalCount % co.loadProgress == 0)
Console.err.print(".")
}
co.loadLimit match {
case Some(limit) if globalCount+ bufCaret == limit => loadRemainder(); false
case _ => true
}
}
def loadRemainder(): Unit = {
mongoGobble(co,buffer.slice(0,bufCaret).toList,globalCount)
globalCount += bufCaret
bufCaret = 0
}
}
val rankerPairs: Iterator[RankerPairs] = Source.fromFile(co.pairsFile).getLines()
.take(co.nPairs).map { line =>
// TODO extract similar to Scoobi's fromDelimitedTextFile
val p = line.split("\t")
val ranker = p(0).toLong
val pairs = p(1).split(",").map{ s =>
val ab = s.split(" ").map(_.toLong)
(ab(0),ab(1))
}
RankerPairs(ranker, pairs)
}
loadAll(co, rankerPairs)
}
def loadAll(co: Conf, rankerPairs: Iterator[RankerPairs]): Unit = {
val buffer = new BufferLoad(co)
rankerPairs.zipWithIndex foreach { case (RankerPairs(r,pairs),rankersProgress) =>
if (rankersProgress % co.rankersProgress == 0) Console.err.print("-")
pairs.zipWithIndex foreach { case ((x,y), i) =>
val keepGoing = buffer.add((x,y,r,i))
if (!keepGoing) return
}
}
Console.err.println
buffer.loadRemainder()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment