Skip to content

Instantly share code, notes, and snippets.

@nightscape
Created March 31, 2015 23:46
Show Gist options
  • Save nightscape/bb40619279ab3e2c0b4b to your computer and use it in GitHub Desktop.
Save nightscape/bb40619279ab3e2c0b4b to your computer and use it in GitHub Desktop.
Parquet to CSV
import ComparisonChain._
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.BufferedReader
import java.io.BufferedWriter
import java.io.Closeable
import java.io.File
import java.io.File
import java.io.FileInputStream
import java.io.FilenameFilter
import java.io.FileOutputStream
import java.io.FileReader
import java.io.FileWriter
import java.io.InputStream
import java.io.IOException
import java.io.OutputStream
import java.io.PrintWriter
import java.util.ArrayList
import java.util.Arrays
import java.util.Collections
import java.util.List
import java.util.regex.Pattern
import org.apache.commons.io.FileUtils
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.file.tfile.Utils.Version
import parquet.column.page.PageReadStore
import parquet.example.data.Group
import parquet.example.data.simple.convert.GroupRecordConverter
import parquet.hadoop.example.GroupReadSupport
import parquet.hadoop.metadata.ParquetMetadata
import parquet.hadoop.ParquetFileReader
import parquet.hadoop.ParquetReader
import parquet.io.ColumnIOFactory
import parquet.io.MessageColumnIO
import parquet.io.RecordReader
import parquet.Log
import parquet.Preconditions
import parquet.schema.MessageType
import parquet.schema.MessageTypeParser
import Version._
object Utils {
def closeQuietly(res: Closeable) {
try {
if (res != null) {
res.close()
}
} catch {
case ioe: IOException => println("Exception closing reader " + res + ": " + ioe.getMessage)
}
}
}
class ConvertUtils
object ConvertUtils {
private val LOG = Log.getLog(classOf[ConvertUtils])
val CSV_DELIMITER = "\t"
private def readFile(path: String): String = {
val reader = new BufferedReader(new FileReader(path))
val stringBuilder = new StringBuilder()
try {
var line: String = null
val ls = System.getProperty("line.separator")
while ((line = reader.readLine()) != null) {
stringBuilder.append(line)
stringBuilder.append(ls)
}
} finally {
Utils.closeQuietly(reader)
}
stringBuilder.toString
}
def getSchema(csvFile: File): String = {
val fileName = csvFile.getName.substring(0, csvFile.getName.length - ".csv".length) +
".schema"
val schemaFile = new File(csvFile.getParentFile, fileName)
readFile(schemaFile.getAbsolutePath)
}
def convertParquetToCSV(parquetFile: File, csvOutputFile: File) {
Preconditions.checkArgument(parquetFile.getName.endsWith(".parquet"), "parquet file should have .parquet extension")
Preconditions.checkArgument(csvOutputFile.getName.endsWith(".csv"), "csv file should have .csv extension")
Preconditions.checkArgument(!csvOutputFile.exists(), "Output file " + csvOutputFile.getAbsolutePath + " already exists")
LOG.info("Converting " + parquetFile.getName + " to " + csvOutputFile.getName)
val parquetFilePath = new Path(parquetFile.toURI())
val configuration = new Configuration(true)
val readSupport = new GroupReadSupport()
val readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath)
val schema = readFooter.getFileMetaData.getSchema
readSupport.init(configuration, null, schema)
val w = new BufferedWriter(new FileWriter(csvOutputFile))
val reader = new ParquetReader[Group](parquetFilePath, readSupport)
try {
var g: Group = null
while ((g = reader.read()) != null) {
writeGroup(w, g, schema)
}
reader.close()
} finally {
Utils.closeQuietly(w)
}
}
private def writeGroup(w: BufferedWriter, g: Group, schema: MessageType) {
for (j <- 0 until schema.getFieldCount) {
if (j > 0) {
w.write(CSV_DELIMITER)
}
val valueToString = g.getValueToString(j, 0)
w.write(valueToString)
}
w.write('\n')
}
}
@SamsudhinHabeeb
Copy link

hi nightscape,
i am trying use your code to convert parqquet file to csv, facing some issues while calling convertParquetToCSV with file path name. the def is expecting File datatype. can you pleases explain how i can pass the path instead of File

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment