Created
July 8, 2022 09:47
-
-
Save Rohitrajak1807/62220c7a9287e6818f83be13aab92df4 to your computer and use it in GitHub Desktop.
Illustrates the use of Log Analytics Query API to get Max CPU millicores and Max Memory RSS in bytes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// getVirtualNodeUsageStats fetches the virtual node max CPU milli-cores and max memory used in bytes within a span of | |
// startDateTime to endDateTime. KQL requires the client to ensure that these times are in UTC and ISO 8061 format(RFC3339) | |
func getVirtualNodeUsageStats(ctx context.Context, tokenCredential azcore.TokenCredential, subscriptionId, clusterName, resourceGroup string, startDateTime, endDateTime time.Time) (map[string]float64, error) { | |
client, err := armoperationalinsights.NewWorkspacesClient(subscriptionId, tokenCredential, nil) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
workspace, err := client.Get(ctx, resourceGroup, workspaceName, nil) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
logAnalyticsQueryURL := "https://api.loganalytics.io/v1/workspaces/{workspace-id}/query" | |
logAnalyticsQueryURL = strings.ReplaceAll(logAnalyticsQueryURL, "{workspace-id}", *workspace.Workspace.Properties.CustomerID) | |
var body struct { | |
Query string `json:"query"` | |
} | |
body.Query = getUsageMetricQuery(subscriptionId, resourceGroup, clusterName, startDateTime, endDateTime) | |
bodyBytes, err := json.Marshal(body) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
token, err := requestTokenWithLogAnalyticsScope(ctx, tokenCredential) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
bytes.NewBuffer(bodyBytes) | |
req, err := http.NewRequest(http.MethodPost, logAnalyticsQueryURL, bytes.NewBuffer(bodyBytes)) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
req.Header.Add("content-type", "application/json") | |
req.Header.Add("authorization", fmt.Sprintf("Bearer %s", token.Token)) | |
response, err := http.DefaultClient.Do(req) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
responseBody, err := ioutil.ReadAll(response.Body) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
defer response.Body.Close() | |
stats, err := deserializeAsMap(responseBody) | |
if err != nil { | |
if err == ErrNoData { | |
zap.S().Warnf("%s, %s, %s, %s", err.Error(), resourceGroup, clusterName, *workspace.ID) | |
} | |
return nil, err | |
} | |
return stats, nil | |
} | |
// getUsageMetricQuery builds the KQL query for fetching max CPU and memory usage | |
func getUsageMetricQuery(subscriptionId, resourceGroup, clusterName string, startDate, endDate time.Time) string { | |
startDate = startDate.UTC() | |
endDate = endDate.UTC() | |
startDateString := startDate.Format(time.RFC3339) | |
endDateString := endDate.Format(time.RFC3339) | |
var query = ` | |
let subscriptionId = '{subscription-id}'; | |
let resourceGroup = '{resource-group}'; | |
let clusterName = '{cluster-name}'; | |
let startDateTime = datetime({start-tate-time}); | |
let endDateTime = datetime({end-date-time}); | |
let clusterId = strcat('/subscriptions/', subscriptionId, '/resourceGroups/', resourceGroup, '/providers/Microsoft.ContainerService/managedClusters/', clusterName); | |
let memoryUsageCounterName = 'memoryRssBytes'; | |
let primaryInventory = KubePodInventory | | |
where TimeGenerated >= startDateTime | | |
where TimeGenerated < endDateTime | | |
where isnotempty(ClusterName) | | |
where isnotempty(Namespace) | | |
extend Node = Computer | | |
where ClusterId == clusterId | | |
where Node == 'virtual-node-aci-linux' | | |
project TimeGenerated, ClusterId, ClusterName, Namespace, ServiceName, Node = Computer, ControllerName, Pod = Name, ContainerInstance = ContainerName, ContainerID, InstanceName, PerfJoinKey = strcat(ClusterId, '/', ContainerName), ReadySinceNow = format_timespan(endDateTime - ContainerCreationTimeStamp, 'ddd.hh:mm:ss.fff'), Restarts = ContainerRestartCount, Status = ContainerStatus, ContainerStatusReason = columnifexists('ContainerStatusReason', ''), ControllerKind = ControllerKind, PodStatus, ControllerId = strcat(ClusterId, '/', Namespace, '/', ControllerName); | |
let latestContainersByController = primaryInventory | | |
where isnotempty(Node) | | |
summarize arg_max(TimeGenerated, *) by PerfJoinKey | | |
project ControllerId, PerfJoinKey; | |
let filteredMemoryUsage = Perf | | |
where TimeGenerated >= startDateTime | | |
where TimeGenerated < endDateTime | | |
where ObjectName == 'K8SContainer' | | |
where InstanceName startswith clusterId | | |
project TimeGenerated, CounterName, CounterValue, InstanceName, Node = Computer | | |
where Node == 'virtual-node-aci-linux'; | |
let memoryUsageByController = filteredMemoryUsage | | |
where CounterName =~ memoryUsageCounterName | | |
extend PerfJoinKey = InstanceName | | |
summarize Value = max(CounterValue) by PerfJoinKey, CounterName | | |
join (latestContainersByController) on PerfJoinKey | | |
summarize Value = sum(Value) by ControllerId, CounterName | | |
project ControllerId, CounterName, MemoryAggregationValue = Value; | |
let CPUUsageCounterName = 'cpuUsageNanoCores'; | |
let filteredCPUUsage = Perf | | |
where TimeGenerated >= startDateTime | | |
where TimeGenerated < endDateTime | | |
where ObjectName == 'K8SContainer' | | |
where InstanceName startswith clusterId | | |
project TimeGenerated, CounterName, CounterValue, InstanceName, Node = Computer | | |
where Node == 'virtual-node-aci-linux'; | |
let CPUUsageByController = filteredCPUUsage | | |
where CounterName =~ CPUUsageCounterName | | |
extend PerfJoinKey = InstanceName | | |
summarize Value = max(CounterValue) by PerfJoinKey, CounterName | | |
join (latestContainersByController) on PerfJoinKey | | |
summarize Value = sum(Value) by ControllerId, CounterName | | |
project ControllerId, CounterName, CPUAggregationValue = Value/1000000; | |
let maxMemoryUsage = primaryInventory | | |
distinct ControllerId, ControllerName, ControllerKind, Namespace | | |
join kind=leftouter (memoryUsageByController) on ControllerId | | |
project MaxMemoryRSS = MemoryAggregationValue, ControllerId; | |
let maxCPUUsage = primaryInventory | | |
distinct ControllerId, ControllerName, ControllerKind, Namespace | | |
join kind=leftouter (CPUUsageByController) on ControllerId | | |
project MaxCPUUsage = CPUAggregationValue, ControllerId; | |
maxMemoryUsage | | |
join(maxCPUUsage) on ControllerId | | |
project MaxCPUUsage, MaxMemoryRSS | | |
summarize val1 = sum(MaxCPUUsage), val2 = sum(MaxMemoryRSS)| | |
project MaxCPUUsage = val1, MaxMemoryRSS = val2;` | |
query = strings.ReplaceAll(query, "{subscription-id}", subscriptionId) | |
query = strings.ReplaceAll(query, "{resource-group}", resourceGroup) | |
query = strings.ReplaceAll(query, "{cluster-name}", clusterName) | |
query = strings.ReplaceAll(query, "{start-tate-time}", startDateString) | |
query = strings.ReplaceAll(query, "{end-date-time}", endDateString) | |
return query | |
} | |
// deserializeAsMap deserializes the table JSON response from the log analytics API into a map | |
func deserializeAsMap(body []byte) (map[string]float64, error) { | |
var tableAsMap = make(map[string]float64) | |
var baseTable map[string][]map[string]interface{} | |
if err := json.Unmarshal(body, &baseTable); err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
columns, ok := baseTable["tables"][0]["columns"].([]interface{}) | |
if !ok { | |
zap.S().Error(ErrTypeConvert) | |
return nil, ErrTypeConvert | |
} | |
if len(columns) == 0 { | |
return nil, ErrNoData | |
} | |
rows, ok := baseTable["tables"][0]["rows"].([]interface{}) | |
if !ok { | |
zap.S().Error(ErrTypeConvert) | |
return nil, ErrTypeConvert | |
} | |
if len(rows) == 0 { | |
return nil, ErrNoData | |
} | |
singleRow, ok := rows[0].([]interface{}) | |
if !ok { | |
zap.S().Error(ErrTypeConvert) | |
return nil, ErrTypeConvert | |
} | |
for i, col := range columns { | |
header, ok := col.(map[string]interface{}) | |
if !ok { | |
zap.S().Error(ErrTypeConvert) | |
return nil, ErrTypeConvert | |
} | |
name, ok := header["name"].(string) | |
if !ok { | |
zap.S().Error(ErrTypeConvert) | |
return nil, ErrTypeConvert | |
} | |
value, ok := singleRow[i].(float64) | |
if !ok { | |
if singleRow[i] == nil { | |
zap.S().Warnf("%v is nil, setting to 0", name) | |
tableAsMap[name] = 0 | |
continue | |
} | |
zap.S().Error(ErrTypeConvert) | |
return nil, ErrTypeConvert | |
} | |
tableAsMap[name] = value | |
} | |
return tableAsMap, nil | |
} | |
// requestTokenWithLogAnalyticsScope manually acquires a token with required scopes for log analytics REST endpoints | |
func requestTokenWithLogAnalyticsScope(ctx context.Context, tokenCredential azcore.TokenCredential) (*azcore.AccessToken, error) { | |
const scope = "https://api.loganalytics.io/.default" | |
token, err := tokenCredential.GetToken(ctx, policy.TokenRequestOptions{ | |
Scopes: []string{scope}, | |
}) | |
if err != nil { | |
zap.S().Error(err) | |
return nil, err | |
} | |
return &token, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment