Created
March 27, 2012 20:07
-
-
Save alexy/2219800 to your computer and use it in GitHub Desktop.
Mongo Bulk Load from Casbah
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mongodb.casbah._ | |
import org.joda.time.DateTime | |
import collection.mutable.ArrayBuffer | |
//... | |
def mongoGobble(co: Conf, chunk: List[Quad], globalIndex: Int) = { | |
val gobbles = chunk.zipWithIndex map { case ((x,y,r,ri),i) => | |
val n = globalIndex + i | |
val id = co.batch match { | |
case Some(b) => "%d:%d:%d".format(r,b,n) | |
case _ => "%d:%d".format(r,n) | |
} | |
Map("_id" -> id, | |
"x" -> x.toString, "y" -> y.toString, | |
"r" -> r.toString, "i" -> ri, | |
"ca" -> DateTime.now.getMillis.toDouble, | |
"t" -> 1) | |
} | |
co.mc.insert(gobbles:_*) | |
// we now use co.loadProgress in BufferLoad.add | |
// which should be a multiple of co.loadChunkSize | |
// Console.err.print(".") | |
} | |
class BufferLoad(co: Conf) { | |
var globalCount = 0 | |
var bufCaret = 0 | |
val buffer = new ArrayBuffer[Quad](initialSize=co.loadChunkSize) | |
def add(elem: Quad): Boolean = { | |
buffer += elem | |
bufCaret = (bufCaret + 1) % co.loadChunkSize | |
if (bufCaret == 0) { | |
mongoGobble(co,buffer.toList,globalCount) | |
buffer.clear() | |
globalCount += co.loadChunkSize | |
if (globalCount % co.loadProgress == 0) | |
Console.err.print(".") | |
} | |
co.loadLimit match { | |
case Some(limit) if globalCount+ bufCaret == limit => loadRemainder(); false | |
case _ => true | |
} | |
} | |
def loadRemainder(): Unit = { | |
mongoGobble(co,buffer.slice(0,bufCaret).toList,globalCount) | |
globalCount += bufCaret | |
bufCaret = 0 | |
} | |
} | |
val rankerPairs: Iterator[RankerPairs] = Source.fromFile(co.pairsFile).getLines() | |
.take(co.nPairs).map { line => | |
// TODO extract similar to Scoobi's fromDelimitedTextFile | |
val p = line.split("\t") | |
val ranker = p(0).toLong | |
val pairs = p(1).split(",").map{ s => | |
val ab = s.split(" ").map(_.toLong) | |
(ab(0),ab(1)) | |
} | |
RankerPairs(ranker, pairs) | |
} | |
loadAll(co, rankerPairs) | |
} | |
def loadAll(co: Conf, rankerPairs: Iterator[RankerPairs]): Unit = { | |
val buffer = new BufferLoad(co) | |
rankerPairs.zipWithIndex foreach { case (RankerPairs(r,pairs),rankersProgress) => | |
if (rankersProgress % co.rankersProgress == 0) Console.err.print("-") | |
pairs.zipWithIndex foreach { case ((x,y), i) => | |
val keepGoing = buffer.add((x,y,r,i)) | |
if (!keepGoing) return | |
} | |
} | |
Console.err.println | |
buffer.loadRemainder() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment