Created
March 30, 2020 09:47
-
-
Save asm0dey/7720e08ebd637a3064542795ff915a44 to your computer and use it in GitHub Desktop.
Reading ORC data not from files, but from input streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plugins { | |
id 'org.jetbrains.kotlin.jvm' version '1.3.70' | |
} | |
group 'org.example' | |
version '1.0-SNAPSHOT' | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
implementation 'org.apache.orc:orc-core:1.6.2' | |
implementation 'org.apache.hadoop:hadoop-common:3.1.2' | |
implementation 'org.apache.hadoop:hadoop-hdfs-client:3.1.2' | |
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8" | |
} | |
compileKotlin { | |
kotlinOptions.jvmTarget = "1.8" | |
} | |
compileTestKotlin { | |
kotlinOptions.jvmTarget = "1.8" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.orc.CompressionKind | |
import org.apache.orc.OrcProto | |
import org.apache.orc.OrcProto.* | |
import org.apache.orc.OrcUtils | |
import org.apache.orc.impl.BufferChunk | |
import org.apache.orc.impl.InStream | |
import org.apache.orc.impl.InStream.StreamOptions | |
import org.apache.orc.impl.OrcCodecPool | |
import java.io.FileInputStream | |
import java.io.InputStream | |
import java.nio.ByteBuffer | |
import java.nio.file.Paths | |
@ExperimentalUnsignedTypes | |
fun main() { | |
// OrcFile.createReader(Path("/home/finkel/output.orc"),OrcFile.ReaderOptions(Configuration())) | |
// .stripes | |
// .forEach { println("it.offset = ${it.offset}") } | |
val (postscript, postscriptLength) = readPostScript() | |
println("postscript.footerLength = ${postscript.footerLength}") | |
println("postscript.compression = ${postscript.compression}") | |
println("postscriptLength = $postscriptLength") | |
val footer = readFooter(postscript, postscriptLength) | |
// val reader = OrcFile.createReader(Path(file.absolutePath), OrcFile.ReaderOptions(Configuration(true))) | |
// println("reader.stripes = ${reader.stripes}") | |
println("footer?.stripesCount = ${footer?.stripesCount}") | |
println("footer?.stripesList = ${footer?.stripesList}") | |
} | |
private val OrcProto.CompressionKind.orc get() = CompressionKind.valueOf(name) | |
@ExperimentalUnsignedTypes | |
fun readFooter( | |
postscript: PostScript, | |
postscriptLength: Int | |
): Footer? { | |
val footerBytes = | |
readLastNBytesFromStream(1 + postscriptLength + postscript.metadataLength).use { it.readAllBytes() } | |
val codec = OrcCodecPool.getCodec(postscript.compression.orc) | |
return if (postscript.compression.orc != CompressionKind.NONE) { | |
val psOffset = footerBytes.size.toLong() - 1 - postscriptLength | |
val test = BufferChunk(ByteBuffer.wrap(footerBytes), 0) | |
Footer.parseFrom( | |
InStream.createCodedInputStream( | |
InStream.create( | |
"footer", | |
test, | |
psOffset - postscript.footerLength, | |
postscript.footerLength, | |
StreamOptions().withCodec(codec).withBufferSize(postscript.compressionBlockSize.toInt()) | |
) | |
) | |
) | |
} else Footer.parseFrom(footerBytes) | |
} | |
private const val MAX_POSTSCRIPT_SIZE = 256 | |
private fun readPostScript(): Pair<PostScript, Int> { | |
val last257Bytes = readLastNBytesFromStream(file.length() - (MAX_POSTSCRIPT_SIZE + 1)).use { it.readAllBytes() } | |
//last byte stores postscript length | |
val postscriptLength = last257Bytes.last().toInt() | |
println("postscriptLength = $postscriptLength") | |
//and it can't be encoded/compressed by specification | |
return PostScript.parseFrom( | |
last257Bytes.takeLast(postscriptLength + 1).dropLast(1).toByteArray() | |
) to postscriptLength | |
} | |
private fun readLastNBytesFromStream(n: Long): InputStream { | |
return inputStream.also { it.skip(n) } | |
} | |
var file = Paths.get("/home/finkel/output.orc").toFile() | |
val inputStream: FileInputStream get() = file.inputStream() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment