Last active
December 14, 2015 19:45
-
-
Save belablotski/453e9b49e250beea7c14 to your computer and use it in GitHub Desktop.
Spark DataFrame tabular representation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Tabular representation of Spark dataset. | |
* Idea and initial implementation is from http://stackoverflow.com/questions/7539831/scala-draw-table-to-console. | |
* Usage: | |
* 1. Import source to spark-shell: | |
* set HADOOP_HOME=D:\Java\extra_conf | |
* cd D:\Java\spark-1.4.1-bin-hadoop2.6\bin | |
* spark-shell.cmd --master local[2] --packages com.databricks:spark-csv_2.10:1.3.0 -i /path/to/AvbTabulator.scala | |
* 2. Tabulator usage: | |
* import org.apache.spark.sql.hive.HiveContext | |
* val hiveContext = new HiveContext(sc) | |
* val stat = hiveContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", "\t").load("D:\\data\\stats-belablotski.tsv") | |
* stat.registerTempTable("stat") | |
* AvbTabulator(hiveContext.sql("SELECT * FROM stat").take(20)) | |
* AvbTabulator(hiveContext.sql("SELECT * FROM stat")) | |
* @author Aliaksei Belablotski | |
* @author Duncan McGregor | |
* | |
* Testing: | |
* With "Auto-Mpg" dataset (see LoadAutoMpgDataset.scala): | |
* statAutoMpg.registerTempTable("stat_auto_mpg") | |
* AvbTabulator(sqlContext.sql("SELECT cylinders, `car name`, `model year`, weight, horsepower FROM stat_auto_mpg WHERE cylinders in (3,5) ORDER BY cylinders"), 10, false) | |
*/ | |
object AvbTabulator { | |
def format(table: Seq[Seq[Any]], isHeaderNeeded: Boolean) : String = table match { | |
case Seq() => "" | |
case _ => | |
val sizes = for (row <- table) yield (for (cell <- row) yield if (cell == null) 0 else cell.toString.length) | |
val colSizes = for (col <- sizes.transpose) yield col.max | |
val rows = for (row <- table) yield formatRow(row, colSizes) | |
formatRows(rowSeparator(colSizes), rows, isHeaderNeeded) | |
} | |
def formatRes(table: Array[org.apache.spark.sql.Row]): String = { | |
val res: Seq[Seq[Any]] = (for { r <- table } yield r.toSeq).toSeq | |
format(res, false) | |
} | |
def formatDf(df: org.apache.spark.sql.DataFrame, n: Int = 20, isHeaderNeeded: Boolean = true): String = { | |
val res: Seq[Seq[Any]] = (for { r <- df.take(n) } yield r.toSeq).toSeq | |
format(List(df.schema.map(_.name).toSeq) ++ res, isHeaderNeeded) | |
} | |
def apply(table: Array[org.apache.spark.sql.Row]): Unit = | |
println(formatRes(table)) | |
/** | |
* Print DataFrame in a formatted manner. | |
* @param df Data frame | |
* @param n How many row to take for tabular printing | |
*/ | |
def apply(df: org.apache.spark.sql.DataFrame, n: Int = 20, isHeaderNeeded: Boolean = true): Unit = | |
println(formatDf(df, n, isHeaderNeeded)) | |
def formatRows(rowSeparator: String, rows: Seq[String], isHeaderNeeded: Boolean): String = ( | |
{ if (isHeaderNeeded) rowSeparator + "\n" + rows.head + "\n" + rowSeparator else rowSeparator } :: | |
rows.tail.toList ::: | |
rowSeparator :: | |
List()).mkString("\n") | |
def formatRow(row: Seq[Any], colSizes: Seq[Int]) = { | |
val cells = (for ((item, size) <- row.zip(colSizes)) yield if (size == 0) "" else ("%" + size + "s").format(item)) | |
cells.mkString("|", "|", "|") | |
} | |
def rowSeparator(colSizes: Seq[Int]) = colSizes map { "-" * _ } mkString("+", "+", "+") | |
/* | |
def main(args: Array[String]): Unit = { | |
println(format(List(List("head1", "head2", "head3"), List("one", "two", "three"), List("four", "five", "six")))) | |
} | |
*/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment