Created
May 9, 2018 19:39
-
-
Save OElesin/19db48af26fcff206c8a08f1e0fe881e to your computer and use it in GitHub Desktop.
Apache Tika Parquet Parser in Scala. You can add this to your project to read parquet files with Apache Tika
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{File, FileOutputStream, IOException, InputStream} | |
import java.util | |
import scala.collection.JavaConverters._ | |
import org.xml.sax.{ContentHandler, SAXException} | |
import org.apache.tika.metadata.Metadata | |
import org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE | |
import org.apache.tika.mime.MediaType | |
import org.apache.tika.parser.{AbstractParser, ParseContext} | |
import org.apache.commons.io.IOUtils | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.Path | |
import org.apache.parquet.hadoop.ParquetFileReader | |
import org.apache.parquet.hadoop.ParquetReader | |
import org.apache.parquet.format.converter.ParquetMetadataConverter | |
import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord} | |
import org.apache.tika.exception.TikaException | |
import org.apache.tika.sax.XHTMLContentHandler | |
import scala.collection.mutable | |
class TikaParquetParser extends AbstractParser { | |
final val PARQUET_RAW = MediaType.application("x-parquet") | |
private val SUPPORTED_TYPES: Set[MediaType] = Set(PARQUET_RAW) | |
def getSupportedTypes(context: ParseContext): util.Set[MediaType] = { | |
SUPPORTED_TYPES.asJava | |
} | |
@throws(classOf[IOException]) | |
@throws(classOf[SAXException]) | |
@throws(classOf[TikaException]) | |
def parse(stream: InputStream, handler: ContentHandler, | |
metadata: Metadata, context: ParseContext): Unit = { | |
// create temp file from stream | |
val tempFile = File.createTempFile("parquet", "tmp") | |
IOUtils.copy(stream, new FileOutputStream(tempFile)) | |
val conf = new Configuration() | |
val path = new Path(tempFile.getAbsolutePath) | |
val parquetMetadata = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER) | |
val columns: mutable.Map[String, String] = parquetMetadata.getFileMetaData.getKeyValueMetaData.asScala | |
metadata.set(CONTENT_TYPE, PARQUET_RAW.toString) | |
metadata.set("Total Number of Columns", columns.size.toString) | |
metadata.set("Parquet Column Names", columns.keys.mkString(", ")) | |
val xhtml = new XHTMLContentHandler(handler, metadata) | |
xhtml.startDocument() | |
try { | |
xhtml.startElement("p") | |
val parquetReader = ParquetReader.builder(new SimpleReadSupport(), new Path(tempFile.getAbsolutePath)).build() | |
val parquetRecordValues = parquetReader.read().getValues().asScala.toSet[SimpleRecord.NameValue] | |
val charset = parquetRecordValues.mkString(", ") | |
xhtml.characters(charset) | |
xhtml.endElement("p") | |
xhtml.endDocument() | |
} finally { | |
if (tempFile != null) tempFile.delete() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment