Created
August 29, 2012 20:33
-
-
Save hoffrocket/3518541 to your computer and use it in GitHub Desktop.
Mongo streaming dump in scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.ByteArrayInputStream | |
import java.io.ByteArrayOutputStream | |
import java.io.EOFException | |
import java.io.InputStream | |
import java.io.OutputStream | |
import org.bson.BSONCallback | |
import org.bson.BSONObject | |
import com.mongodb.DBCallback | |
import com.mongodb.DBCollection | |
import com.mongodb.DBCursor | |
import com.mongodb.DBDecoder | |
import com.mongodb.DBDecoderFactory | |
import com.mongodb.DBObject | |
import com.mongodb.DefaultDBDecoder | |
import com.mongodb.Mongo | |
object StreamingDump { | |
class StreamingDBDecoderFactory(os: OutputStream) extends DBDecoderFactory { | |
def create(): DBDecoder = new StreamingDBDecoder(os) | |
} | |
class StreamingDBDecoder(os: OutputStream) extends DBDecoder { | |
val buffer: Array[Byte] = new Array[Byte](4096) | |
def decode(in: InputStream, collection: DBCollection): DBObject = { | |
val ch1 = in.read() | |
val ch2 = in.read() | |
val ch3 = in.read() | |
val ch4 = in.read() | |
if ((ch1 | ch2 | ch3 | ch4) < 0) | |
throw new EOFException() | |
// little endian | |
var toread = ((ch4 << 24) + (ch3 << 16) + (ch2 << 8) + (ch1 << 0)) | |
os.write(ch1) | |
os.write(ch2) | |
os.write(ch3) | |
os.write(ch4) | |
// already read 4 bytes | |
toread -= 4 | |
while (toread > 0) { | |
val read = in.read(buffer, 0, math.min(toread, buffer.length)) | |
toread -= read | |
os.write(buffer, 0, read) | |
} | |
null | |
} | |
def getDBCallback(collection: DBCollection): DBCallback = { | |
throw new UnsupportedOperationException("Not implemented") | |
} | |
def decode(bytes: Array[Byte], collection: DBCollection): DBObject = { | |
throw new UnsupportedOperationException("Not implemented") | |
} | |
def readObject(bytes: Array[Byte]): BSONObject = { | |
throw new UnsupportedOperationException("Not implemented") | |
} | |
def readObject(in: InputStream): BSONObject = { | |
throw new UnsupportedOperationException("Not implemented") | |
} | |
def decode(bytes: Array[Byte], callback: BSONCallback): Int = { | |
throw new UnsupportedOperationException("Not implemented") | |
} | |
def decode(in: InputStream, callback: BSONCallback): Int = { | |
throw new UnsupportedOperationException("Not implemented") | |
} | |
} | |
def main(args: Array[String]) { | |
val mongo = new Mongo("127.0.0.1", 27017) | |
val coll: DBCollection = mongo.getDB("test").getCollection("test") | |
val os = new ByteArrayOutputStream() | |
val cursor: DBCursor = coll.find() | |
cursor.setDecoderFactory(new StreamingDBDecoderFactory(os)) | |
// run through the whole cursor | |
while (cursor.hasNext()) { | |
cursor.next() | |
} | |
cursor.close() | |
mongo.close() | |
// test that actually worked | |
val decoder = new DefaultDBDecoder | |
val is = new ByteArrayInputStream(os.toByteArray()) | |
val results = Stream.continually(decoder.readObject(is)).takeWhile(_ => is.available() > 0) | |
println(results.toList) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: using this code to write to a file seems to be faster than the mongodump command line tool