Skip to content

Instantly share code, notes, and snippets.

@cosh
Last active June 6, 2023 22:31
Show Gist options
  • Save cosh/98a6342390d52283dd45a4793a6f7f4d to your computer and use it in GitHub Desktop.
Save cosh/98a6342390d52283dd45a4793a6f7f4d to your computer and use it in GitHub Desktop.
How to partition data for ingestion to Kusto
.show table <tableName> extents
| summarize count() by MinCreatedOn
| render timechart
LightIngest.exe "https://ingest-<clusterName>.<region>.kusto.windows.net/;Fed=True" -database:"<databaseName>" -table:"tableName" -source:"https://<storageAccountName>.blob.core.windows.net/<containerName><sasToken>" -format:"parquet" -prefix:"<folderPrefix>" -pattern:"*.parquet" -ingestionMappingRef:"<mappingName" -creationTimePattern:"day='yyyyMMdd'/" -dontWait:true
import org.apache.spark.sql.functions._
import java.sql.Timestamp
//just a test class to demonstrate the date based partitioning. IRL you should get your dataframe from a storage (whatever format it is)
case class MyTimeseriesDataClass(ts: Timestamp, deviceId: String, signalName: String, signaleValue: Double, someints: Seq[Int], somemap: Map[String, Int])
val dataframe = sc.parallelize(Array(MyTimeseriesDataClass(Timestamp.valueOf("2014-01-01 23:00:01"), "id1", "temperature", 23.5, Array(1), Map("a" -> 1)),
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-01 23:00:01"), "id1", "temperature", 13.5, Array(2, 2), Map("b" -> 2)),
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-02 23:00:01"), "id1", "humidity", 45.3, Array(3, 3, 3), Map("c" -> 3)),
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-03 23:00:01"), "id2", "temperature", 33.5, Array(4, 4, 4, 4), Map("d" -> 4)),
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-04 23:00:01"), "id3", "humidity", 41.1, Array(5, 5, 5, 5, 5), Map("e" -> 5)))
).toDF()
val rowsPerBlob = 750000
dataframe
.withColumn("day", date_format(col("ts"), "yyyyMMdd"))
.write
.option("maxRecordsPerFile", rowsPerBlob)
.mode("append")
.partitionBy("day")
.parquet("/mnt/partitionedData")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment