Created
March 31, 2015 23:46
-
-
Save nightscape/bb40619279ab3e2c0b4b to your computer and use it in GitHub Desktop.
Parquet to CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ComparisonChain._ | |
import java.io.BufferedInputStream | |
import java.io.BufferedOutputStream | |
import java.io.BufferedReader | |
import java.io.BufferedWriter | |
import java.io.Closeable | |
import java.io.File | |
import java.io.File | |
import java.io.FileInputStream | |
import java.io.FilenameFilter | |
import java.io.FileOutputStream | |
import java.io.FileReader | |
import java.io.FileWriter | |
import java.io.InputStream | |
import java.io.IOException | |
import java.io.OutputStream | |
import java.io.PrintWriter | |
import java.util.ArrayList | |
import java.util.Arrays | |
import java.util.Collections | |
import java.util.List | |
import java.util.regex.Pattern | |
import org.apache.commons.io.FileUtils | |
import org.apache.commons.io.IOUtils | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.Path | |
import org.apache.hadoop.io.file.tfile.Utils.Version | |
import parquet.column.page.PageReadStore | |
import parquet.example.data.Group | |
import parquet.example.data.simple.convert.GroupRecordConverter | |
import parquet.hadoop.example.GroupReadSupport | |
import parquet.hadoop.metadata.ParquetMetadata | |
import parquet.hadoop.ParquetFileReader | |
import parquet.hadoop.ParquetReader | |
import parquet.io.ColumnIOFactory | |
import parquet.io.MessageColumnIO | |
import parquet.io.RecordReader | |
import parquet.Log | |
import parquet.Preconditions | |
import parquet.schema.MessageType | |
import parquet.schema.MessageTypeParser | |
import Version._ | |
object Utils { | |
def closeQuietly(res: Closeable) { | |
try { | |
if (res != null) { | |
res.close() | |
} | |
} catch { | |
case ioe: IOException => println("Exception closing reader " + res + ": " + ioe.getMessage) | |
} | |
} | |
} | |
class ConvertUtils | |
object ConvertUtils { | |
private val LOG = Log.getLog(classOf[ConvertUtils]) | |
val CSV_DELIMITER = "\t" | |
private def readFile(path: String): String = { | |
val reader = new BufferedReader(new FileReader(path)) | |
val stringBuilder = new StringBuilder() | |
try { | |
var line: String = null | |
val ls = System.getProperty("line.separator") | |
while ((line = reader.readLine()) != null) { | |
stringBuilder.append(line) | |
stringBuilder.append(ls) | |
} | |
} finally { | |
Utils.closeQuietly(reader) | |
} | |
stringBuilder.toString | |
} | |
def getSchema(csvFile: File): String = { | |
val fileName = csvFile.getName.substring(0, csvFile.getName.length - ".csv".length) + | |
".schema" | |
val schemaFile = new File(csvFile.getParentFile, fileName) | |
readFile(schemaFile.getAbsolutePath) | |
} | |
def convertParquetToCSV(parquetFile: File, csvOutputFile: File) { | |
Preconditions.checkArgument(parquetFile.getName.endsWith(".parquet"), "parquet file should have .parquet extension") | |
Preconditions.checkArgument(csvOutputFile.getName.endsWith(".csv"), "csv file should have .csv extension") | |
Preconditions.checkArgument(!csvOutputFile.exists(), "Output file " + csvOutputFile.getAbsolutePath + " already exists") | |
LOG.info("Converting " + parquetFile.getName + " to " + csvOutputFile.getName) | |
val parquetFilePath = new Path(parquetFile.toURI()) | |
val configuration = new Configuration(true) | |
val readSupport = new GroupReadSupport() | |
val readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath) | |
val schema = readFooter.getFileMetaData.getSchema | |
readSupport.init(configuration, null, schema) | |
val w = new BufferedWriter(new FileWriter(csvOutputFile)) | |
val reader = new ParquetReader[Group](parquetFilePath, readSupport) | |
try { | |
var g: Group = null | |
while ((g = reader.read()) != null) { | |
writeGroup(w, g, schema) | |
} | |
reader.close() | |
} finally { | |
Utils.closeQuietly(w) | |
} | |
} | |
private def writeGroup(w: BufferedWriter, g: Group, schema: MessageType) { | |
for (j <- 0 until schema.getFieldCount) { | |
if (j > 0) { | |
w.write(CSV_DELIMITER) | |
} | |
val valueToString = g.getValueToString(j, 0) | |
w.write(valueToString) | |
} | |
w.write('\n') | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hi nightscape,
i am trying use your code to convert parqquet file to csv, facing some issues while calling convertParquetToCSV with file path name. the def is expecting File datatype. can you pleases explain how i can pass the path instead of File