Last active
June 6, 2023 22:31
-
-
Save cosh/98a6342390d52283dd45a4793a6f7f4d to your computer and use it in GitHub Desktop.
How to partition data for ingestion to Kusto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.show table <tableName> extents | |
| summarize count() by MinCreatedOn | |
| render timechart |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
LightIngest.exe "https://ingest-<clusterName>.<region>.kusto.windows.net/;Fed=True" -database:"<databaseName>" -table:"tableName" -source:"https://<storageAccountName>.blob.core.windows.net/<containerName><sasToken>" -format:"parquet" -prefix:"<folderPrefix>" -pattern:"*.parquet" -ingestionMappingRef:"<mappingName" -creationTimePattern:"day='yyyyMMdd'/" -dontWait:true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.functions._ | |
import java.sql.Timestamp | |
//just a test class to demonstrate the date based partitioning. IRL you should get your dataframe from a storage (whatever format it is) | |
case class MyTimeseriesDataClass(ts: Timestamp, deviceId: String, signalName: String, signaleValue: Double, someints: Seq[Int], somemap: Map[String, Int]) | |
val dataframe = sc.parallelize(Array(MyTimeseriesDataClass(Timestamp.valueOf("2014-01-01 23:00:01"), "id1", "temperature", 23.5, Array(1), Map("a" -> 1)), | |
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-01 23:00:01"), "id1", "temperature", 13.5, Array(2, 2), Map("b" -> 2)), | |
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-02 23:00:01"), "id1", "humidity", 45.3, Array(3, 3, 3), Map("c" -> 3)), | |
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-03 23:00:01"), "id2", "temperature", 33.5, Array(4, 4, 4, 4), Map("d" -> 4)), | |
MyTimeseriesDataClass(Timestamp.valueOf("2014-01-04 23:00:01"), "id3", "humidity", 41.1, Array(5, 5, 5, 5, 5), Map("e" -> 5))) | |
).toDF() | |
val rowsPerBlob = 750000 | |
dataframe | |
.withColumn("day", date_format(col("ts"), "yyyyMMdd")) | |
.write | |
.option("maxRecordsPerFile", rowsPerBlob) | |
.mode("append") | |
.partitionBy("day") | |
.parquet("/mnt/partitionedData") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment