Skip to content

Instantly share code, notes, and snippets.

@ri0day
Last active March 26, 2022 07:12
Show Gist options
  • Save ri0day/daeaa9efe96d4b7bde35b64326bd62e6 to your computer and use it in GitHub Desktop.
Save ri0day/daeaa9efe96d4b7bde35b64326bd62e6 to your computer and use it in GitHub Desktop.
discovery.go
package aliyuncms
import (
"encoding/json"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/aliyun/alibaba-cloud-sdk-go/services/rds"
"github.com/aliyun/alibaba-cloud-sdk-go/services/slb"
"github.com/aliyun/alibaba-cloud-sdk-go/services/vpc"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/pkg/errors"
)
type discoveryRequest interface {
}
type aliyunSdkClient interface {
ProcessCommonRequest(req *requests.CommonRequest) (response *responses.CommonResponse, err error)
}
// discoveryTool is a object that provides discovery feature
type discoveryTool struct {
req map[string]discoveryRequest //Discovery request (specific per object type)
rateLimit int //Rate limit for API query, as it is limited by API backend
reqDefaultPageSize int //Default page size while querying data from API (how many objects per request)
cli map[string]aliyunSdkClient //API client, which perform discovery request
respRootKey string //Root key in JSON response where to look for discovery data
respObjectIDKey string //Key in element of array under root key, that stores object ID
//for ,majority of cases it would be InstanceId, for OSS it is BucketName. This key is also used in dimension filtering// )
wg sync.WaitGroup //WG for primary discovery goroutine
interval time.Duration //Discovery interval
done chan bool //Done channel to stop primary discovery goroutine
dataChan chan map[string]interface{} //Discovery data
lg telegraf.Logger //Telegraf logger (should be provided)
}
type parsedDResp struct {
data []interface{}
totalCount int
pageSize int
pageNumber int
}
//getRPCReqFromDiscoveryRequest - utility function to map between aliyun request primitives
//discoveryRequest represents different type of discovery requests
func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest, error) {
if reflect.ValueOf(req).Type().Kind() != reflect.Ptr ||
reflect.ValueOf(req).IsNil() {
return nil, errors.Errorf("Not expected type of the discovery request object: %q, %q", reflect.ValueOf(req).Type(), reflect.ValueOf(req).Kind())
}
ptrV := reflect.Indirect(reflect.ValueOf(req))
for i := 0; i < ptrV.NumField(); i++ {
if ptrV.Field(i).Type().String() == "*requests.RpcRequest" {
if !ptrV.Field(i).CanInterface() {
return nil, errors.Errorf("Can't get interface of %v", ptrV.Field(i))
}
rpcReq, ok := ptrV.Field(i).Interface().(*requests.RpcRequest)
if !ok {
return nil, errors.Errorf("Cant convert interface of %v to '*requests.RpcRequest' type", ptrV.Field(i).Interface())
}
return rpcReq, nil
}
}
return nil, errors.Errorf("Didn't find *requests.RpcRequest embedded struct in %q", ptrV.Type())
}
//newDiscoveryTool function returns discovery tool object.
//The object is used to periodically get data about aliyun objects and send this
//data into channel. The intention is to enrich reported metrics with discovery data.
//Discovery is supported for a limited set of object types (defined by project) and can be extended in future.
//Discovery can be limited by region if not set, then all regions is queried.
//Request against API can inquire additional costs, consult with aliyun API documentation.
func newDiscoveryTool(regions []string, project string, lg telegraf.Logger, credential auth.Credential, rateLimit int, discoveryInterval time.Duration) (*discoveryTool, error) {
var (
dscReq = map[string]discoveryRequest{}
cli = map[string]aliyunSdkClient{}
//parseRootKey = regexp.MustCompile(`Describe(.*)`)
responseRootKey string
responseObjectIDKey string
err error
noDiscoverySupportErr = errors.Errorf("no discovery support for project %q", project)
)
if len(regions) == 0 {
regions = aliyunRegionList
lg.Infof("'regions' is not provided! Discovery data will be queried across %d regions:\n%s",
len(aliyunRegionList), strings.Join(aliyunRegionList, ","))
}
if rateLimit == 0 { //Can be a rounding case
rateLimit = 1
}
for _, region := range regions {
switch project {
case "acs_ecs_dashboard":
dscReq[region] = ecs.CreateDescribeInstancesRequest()
responseRootKey = "Instances"
responseObjectIDKey = "InstanceId"
case "acs_rds_dashboard":
dscReq[region] = rds.CreateDescribeDBInstancesRequest()
responseRootKey = "Items"
responseObjectIDKey = "DBInstanceId"
case "acs_slb_dashboard":
dscReq[region] = slb.CreateDescribeLoadBalancersRequest()
responseRootKey = "LoadBalancers"
responseObjectIDKey = "LoadBalancerId"
case "acs_memcache":
return nil, noDiscoverySupportErr
case "acs_ocs":
return nil, noDiscoverySupportErr
case "acs_oss":
//oss is really complicated
//it is on it's own format
return nil, noDiscoverySupportErr
//As a possible solution we can
//mimic to request format supported by oss
//req := DescribeLOSSRequest{
// RpcRequest: &requests.RpcRequest{},
//}
//req.InitWithApiInfo("oss", "2014-08-15", "DescribeDBInstances", "oss", "openAPI")
case "acs_vpc_eip":
dscReq[region] = vpc.CreateDescribeEipAddressesRequest()
responseObjectIDKey = "AllocationId"
case "acs_kvstore":
return nil, noDiscoverySupportErr
case "acs_mns_new":
return nil, noDiscoverySupportErr
case "acs_cdn":
//API replies are in its own format.
return nil, noDiscoverySupportErr
case "acs_polardb":
return nil, noDiscoverySupportErr
case "acs_gdb":
return nil, noDiscoverySupportErr
case "acs_ads":
return nil, noDiscoverySupportErr
case "acs_mongodb":
return nil, noDiscoverySupportErr
case "acs_express_connect":
return nil, noDiscoverySupportErr
case "acs_fc":
return nil, noDiscoverySupportErr
case "acs_nat_gateway":
return nil, noDiscoverySupportErr
case "acs_sls_dashboard":
return nil, noDiscoverySupportErr
case "acs_containerservice_dashboard":
return nil, noDiscoverySupportErr
case "acs_vpn":
return nil, noDiscoverySupportErr
case "acs_bandwidth_package":
return nil, noDiscoverySupportErr
case "acs_cen":
return nil, noDiscoverySupportErr
case "acs_ens":
return nil, noDiscoverySupportErr
case "acs_opensearch":
return nil, noDiscoverySupportErr
case "acs_scdn":
return nil, noDiscoverySupportErr
case "acs_drds":
return nil, noDiscoverySupportErr
case "acs_iot":
return nil, noDiscoverySupportErr
case "acs_directmail":
return nil, noDiscoverySupportErr
case "acs_elasticsearch":
return nil, noDiscoverySupportErr
case "acs_ess_dashboard":
return nil, noDiscoverySupportErr
case "acs_streamcompute":
return nil, noDiscoverySupportErr
case "acs_global_acceleration":
return nil, noDiscoverySupportErr
case "acs_hitsdb":
return nil, noDiscoverySupportErr
case "acs_kafka":
return nil, noDiscoverySupportErr
case "acs_openad":
return nil, noDiscoverySupportErr
case "acs_pcdn":
return nil, noDiscoverySupportErr
case "acs_dcdn":
return nil, noDiscoverySupportErr
case "acs_petadata":
return nil, noDiscoverySupportErr
case "acs_videolive":
return nil, noDiscoverySupportErr
case "acs_hybriddb":
return nil, noDiscoverySupportErr
case "acs_adb":
return nil, noDiscoverySupportErr
case "acs_mps":
return nil, noDiscoverySupportErr
case "acs_maxcompute_prepay":
return nil, noDiscoverySupportErr
case "acs_hdfs":
return nil, noDiscoverySupportErr
case "acs_ddh":
return nil, noDiscoverySupportErr
case "acs_hbr":
return nil, noDiscoverySupportErr
case "acs_hdr":
return nil, noDiscoverySupportErr
case "acs_cds":
return nil, noDiscoverySupportErr
default:
return nil, errors.Errorf("project %q is not recognized by discovery...", project)
}
cli[region], err = sdk.NewClientWithOptions(region, sdk.NewConfig(), credential)
if err != nil {
return nil, err
}
}
if len(dscReq) == 0 || len(cli) == 0 {
return nil, errors.Errorf("Can't build discovery request for project: %q,\nregions: %v", project, regions)
}
//Getting response root key (if not set already). This is to be able to parse discovery responses
//As they differ per object type
//Discovery requests are of the same type per every region, so pick the first one
//rpcReq, err := getRPCReqFromDiscoveryRequest(dscReq[regions[0]])
//This means that the discovery request is not of proper type/kind
//if err != nil {
// return nil, errors.Errorf("Can't parse rpc request object from discovery request %v", dscReq[regions[0]])
//}
/*
The action name is of the following format Describe<Project related title for managed instances>,
For example: DescribeLoadBalancers -> for SLB project, or DescribeInstances for ECS project
We will use it to construct root key name in the discovery API response.
It follows the following logic: for 'DescribeLoadBalancers' action in discovery request we get the response
in json of the following structure:
{
...
"LoadBalancers": {
"LoadBalancer": [ here comes objects, one per every instance]
}
}
As we can see, the root key is a part of action name, except first word (part) 'Describe'
*/
//result := parseRootKey.FindStringSubmatch(rpcReq.GetActionName())
//if result == nil || len(result) != 2 {
// return nil, errors.Errorf("Can't parse the discovery response root key from request action name %q", rpcReq.GetActionName())
//}
//responseRootKey = result[1]
return &discoveryTool{
req: dscReq,
cli: cli,
respRootKey: responseRootKey,
respObjectIDKey: responseObjectIDKey,
rateLimit: rateLimit,
interval: discoveryInterval,
reqDefaultPageSize: 100,
dataChan: make(chan map[string]interface{}, 1),
lg: lg,
}, nil
}
func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (*parsedDResp, error) {
var (
fullOutput = map[string]interface{}{}
data []byte
foundDataItem bool
foundRootKey bool
pdResp = &parsedDResp{}
)
data = resp.GetHttpContentBytes()
if data == nil { //No data
return nil, errors.Errorf("No data in response to be parsed")
}
if err := json.Unmarshal(data, &fullOutput); err != nil {
return nil, errors.Errorf("Can't parse JSON from discovery response: %v", err)
}
for key, val := range fullOutput {
switch key {
case dt.respRootKey:
foundRootKey = true
rootKeyVal, ok := val.(map[string]interface{})
if !ok {
return nil, errors.Errorf("Content of root key %q, is not an object: %v", key, val)
}
//It should contain the array with discovered data
for _, item := range rootKeyVal {
if pdResp.data, foundDataItem = item.([]interface{}); foundDataItem {
break
}
}
if !foundDataItem {
return nil, errors.Errorf("Didn't find array item in root key %q", key)
}
case "TotalRecordCount":
pdResp.totalCount = int(val.(float64))
case "PageRecordCount":
pdResp.pageSize = int(val.(float64))
case "PageNumber":
pdResp.pageNumber = int(val.(float64))
}
}
if !foundRootKey {
return nil, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey)
}
return pdResp, nil
}
func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, lmtr chan bool) (map[string]interface{}, error) {
var (
err error
resp *responses.CommonResponse
pDResp *parsedDResp
discoveryData []interface{}
totalCount int
pageNumber int
)
defer delete(req.QueryParams, "PageNumber")
for {
if lmtr != nil {
<-lmtr //Rate limiting
}
resp, err = cli.ProcessCommonRequest(req)
if err != nil {
return nil, err
}
pDResp, err = dt.parseDiscoveryResponse(resp)
if err != nil {
return nil, err
}
discoveryData = append(discoveryData, pDResp.data...)
pageNumber = pDResp.pageNumber
totalCount = pDResp.totalCount
//Pagination
pageNumber++
req.QueryParams["PageNumber"] = strconv.Itoa(pageNumber)
if len(discoveryData) == totalCount { //All data received
//Map data to appropriate shape before return
preparedData := map[string]interface{}{}
for _, raw := range discoveryData {
elem, ok := raw.(map[string]interface{})
if !ok {
return nil, errors.Errorf("can't parse input data element, not a map[string]interface{} type")
}
if objectID, ok := elem[dt.respObjectIDKey].(string); ok {
preparedData[objectID] = elem
}
}
return preparedData, nil
}
}
}
func (dt *discoveryTool) getDiscoveryDataAcrossRegions(lmtr chan bool) (map[string]interface{}, error) {
var (
data map[string]interface{}
resultData = map[string]interface{}{}
)
for region, cli := range dt.cli {
//Building common request, as the code below is the same no matter
//which aliyun object type (project) is used
dscReq, ok := dt.req[region]
if !ok {
return nil, errors.Errorf("Error building common discovery request: not valid region %q", region)
}
rpcReq, err := getRPCReqFromDiscoveryRequest(dscReq)
if err != nil {
return nil, err
}
commonRequest := requests.NewCommonRequest()
commonRequest.Method = rpcReq.GetMethod()
commonRequest.Product = rpcReq.GetProduct()
commonRequest.Domain = rpcReq.GetDomain()
commonRequest.Version = rpcReq.GetVersion()
commonRequest.Scheme = rpcReq.GetScheme()
commonRequest.ApiName = rpcReq.GetActionName()
commonRequest.QueryParams = rpcReq.QueryParams
commonRequest.QueryParams["PageSize"] = strconv.Itoa(dt.reqDefaultPageSize)
commonRequest.TransToAcsRequest()
//Get discovery data using common request
data, err = dt.getDiscoveryData(cli, commonRequest, lmtr)
if err != nil {
return nil, err
}
for k, v := range data {
resultData[k] = v
}
}
return resultData, nil
}
// start the discovery pooling
// In case smth. new found it will be reported back through `DataChan`
func (dt *discoveryTool) start() {
var (
err error
data map[string]interface{}
lastData map[string]interface{}
)
//Initializing channel
dt.done = make(chan bool)
dt.wg.Add(1)
go func() {
defer dt.wg.Done()
ticker := time.NewTicker(dt.interval)
defer ticker.Stop()
lmtr := limiter.NewRateLimiter(dt.rateLimit, time.Second)
defer lmtr.Stop()
for {
select {
case <-dt.done:
return
case <-ticker.C:
data, err = dt.getDiscoveryDataAcrossRegions(lmtr.C)
if err != nil {
dt.lg.Errorf("Can't get discovery data: %v", err)
continue
}
if !reflect.DeepEqual(data, lastData) {
lastData = nil
lastData = map[string]interface{}{}
for k, v := range data {
lastData[k] = v
}
//send discovery data in blocking mode
dt.dataChan <- data
}
}
}
}()
}
// stop the discovery loop, making sure
// all data is read from 'dataChan'
func (dt *discoveryTool) stop() {
close(dt.done)
//Shutdown timer
timer := time.NewTimer(time.Second * 3)
defer timer.Stop()
L:
for { //Unblock go routine by reading from dt.dataChan
select {
case <-timer.C:
break L
case <-dt.dataChan:
}
}
dt.wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment