Skip to content

Instantly share code, notes, and snippets.

@hfleitas
Created February 15, 2024 01:55
Show Gist options
  • Save hfleitas/cbc40a13f99c9fb30b9e5b86906b1f05 to your computer and use it in GitHub Desktop.
Save hfleitas/cbc40a13f99c9fb30b9e5b86906b1f05 to your computer and use it in GitHub Desktop.
IOT-HOL.kql
#connect cluster('adxpm10774.eastus').database('IoTAnalytics')
//Data lands in the StageIoTRaw table so lets look at that first
.show table StageIoTRawData
StageIoTRawData
| limit 10
//We use an update policy to flatten this data and append it to the Thermostat table
.show table Thermostats
//Here is the update policy. Expand the Policy column to see the details
.show table Thermostats policy update
//Here is the function that is used for the update policy
.show function ExtractThermostatData
Thermostats
| limit 10
Thermostats
| summarize MinDate=min(EnqueuedTimeUTC), MaxDate=max(EnqueuedTimeUTC), MinIngest=min(ingestion_time())
// There should be a little over 30K rows in this table
Thermostats
| count
//What is the average temp every 1 min for the month of January?
Thermostats
| where EnqueuedTimeUTC between (datetime('2022-11-1') .. datetime('2022-11-30'))
| where DeviceId == '5858b065-9ac2-4703-8e8f-c84eed9b616c'
| summarize avg(Temp) by bin(EnqueuedTimeUTC,1m)
| render timechart
//Same query but lets look at all thermostats in 1 hour aggregates for January
Thermostats
| where EnqueuedTimeUTC between (datetime('2022-11-1') .. datetime('2022-11-30'))
| summarize avg(Temp) by bin(EnqueuedTimeUTC,1h), DeviceId
| render timechart
//Is there any missing data?
//make-series
//Create series of specified aggregated values along specified axis.
let start = datetime(2022-01-01);
let end = datetime(2022-01-4);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) on EnqueuedTimeUTC from start to end step 1m
| render timechart
//How can I fill the missing values?
//series_fill_linear()
//Performs linear interpolation of missing values in a series.
let start = datetime(2022-01-01);
let end = datetime(2022-01-4);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| render timechart
//What will be the temprature for next one hour? Note that we are using historical data so we will use 1-31 as present
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end+1h step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 60)
| render timechart with(title='Forecasting the next 15min by Time Series Decmposition')
//Lets forcast out 12 hours
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end+12h step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 720)
| render timechart with(title='Forecasting the next 15min by Time Series Decmposition')
//Are there any anomalies for this device?
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp,1)
| render anomalychart with(anomalycolumns=anomalies)
//Lets make it less sensative
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp,1.5)
| render anomalychart with(anomalycolumns=anomalies)
//What the anomalies I shoudl focus on across all devices?
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m by DeviceId
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, DeviceId, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp, 1.5)
| mv-expand EnqueuedTimeUTC, anomalies, NoGapsTemp
| where anomalies == 1
//Lets shift focus to the current data being ingested into ADX
//We have the ability to query out to blob, sql, cosmos, adt (azure digital twins), etc..
//The provisioning process created a function that will query out to ADT so lets look at that
//This function allows us to get the thermostats at certain offices (Atlanta, Dallas, or Seattle).
// The metadata includes the floor the thermostat is on
.show function GetDevicesbyStore
//Here is an example of the query. Note that the below ADT endpoint won't match yours
//.create-or-alter function with (folder = "Analytics/IoT", skipvalidation = "true") GetDevicesbyStore(Office:string)
//{
// let ADTendpoint = "https://digitaltwinpm24.api.eus.digitaltwins.azure.net";
// let ADTquery = strcat("SELECT T.$dtId as Office, F.$dtId as Floor, D.$dtId as DeviceId FROM DIGITALTWINS T JOIN F RELATED T.officecontainsfloors JOIN D RELATED F.floorcontainsdevices where T.$dtId='", Office, "'");
// evaluate azure_digital_twins_query_request(ADTendpoint, ADTquery)
// | project Office=tostring(Office), Floor=tostring(Floor), DeviceId=tostring(DeviceId)
//}
//Lets get the devices in the Dallas office
GetDevicesbyStore('Dallas')
//Now lets join that with ADX data and chart the results. We combine the floor from ADT with the DeviceId to ad context
GetDevicesbyStore('Dallas')
| join kind=inner (Thermostats
| where EnqueuedTimeUTC >= ago(1h)
| summarize Temp=avg(Temp) by DeviceId, AggTime=bin(EnqueuedTimeUTC, 1m)
) on DeviceId
| extend DeviceId=strcat(Floor, '-', DeviceId)
| project todouble(Temp), AggTime, DeviceId
| render timechart with (ycolumns = Temp, series=DeviceId)
//Materialized views are commonly used to
// a. Store aggregates of the raw data
// b. Store last know values
// c. Remove duplicate records
// Lets create two materialized views. One for hourly aggregates and one for last known values
// We'll also backfill the records. You will typically use async but for this sample we'll wait for it to complete
.show materialized-views
.create materialized-view with (backfill=true) Hourly_Average_Mview on table Thermostats
{
Thermostats
| summarize Temp=avg(Temp), BatteryLevel=avg(BatteryLevel), Humidity=avg(Humidity) by DeviceId, TimeStamp=bin(EnqueuedTimeUTC, 1h)
}
Hourly_Average_Mview
| where TimeStamp > ago(1h)
| take 1000
// Now lets create a current view
.create materialized-view with (backfill=true) Current_Mview on table Thermostats
{
Thermostats
| summarize arg_max(EnqueuedTimeUTC, *) by DeviceId
}
Current_Mview
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment