Created
September 7, 2019 15:12
-
-
Save huichen/fed0ce62409f865695509d493b414b75 to your computer and use it in GitHub Desktop.
采用多线程和采样算法较快地统计阿里云 tablestore 中满足某个条件的记录数
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// 假设你的 tablestore 中有两个主键,其中一个是另一个的 128bit md5 hash string | |
// 可以使用这个程序较快地统计满足某个条件的记录数 | |
// 采用了多线程和采样,可以将统计速度提升 100 倍 | |
// 我的经验,如果 sampleRation = 0.01 的情况下,假设表格中有 100 万行,统计一遍大约需要 10 秒 | |
import ( | |
"fmt" | |
"log" | |
"math/big" | |
"sync/atomic" | |
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore" | |
) | |
var ( | |
counter uint64 | |
tsClient *tablestore.TableStoreClient | |
// 请替换成你的表信息 | |
endpoint = "xxx" | |
instance = "xxx" | |
keyId = "xxx" | |
keySecret = "xxx" | |
tableName = "xxx" | |
pk = "key" | |
pkHash = "key_hash" | |
// 使用多少个线程并行统计 | |
threads = 10 | |
// 只采样到这么大的比例就停止,0.01 是个合理的值,如果你允许跑更长的时间,请适当增加这个值 | |
sampleRatio = 0.01 | |
) | |
func main() { | |
// 这里替换成你的接入配置信息 | |
tsClient = tablestore.NewClient(endpoint, instance, keyId, keySecret) | |
// 得到 md5 128bit hash 的区间上限 fffff... (32个f) | |
max := big.NewInt(1) | |
two := big.NewInt(2) | |
for i := 0; i < 128; i++ { | |
max.Mul(max, two) | |
} | |
max.Sub(max, big.NewInt(1)) | |
// delta = max / threads | |
delta := big.NewInt(0) | |
delta.Set(max) | |
delta.Div(delta, big.NewInt(int64(threads))) | |
// 用于从线程中返回比例值 | |
backChan := make(chan float64, threads) | |
// 启动 threads 个线程,每个线程处理 [0, max] 的 threads 分之一区间 | |
start := big.NewInt(0) | |
for i := 0; i < threads; i++ { | |
// end = start + delta | |
end := big.NewInt(0) | |
if i == threads-1 { | |
// 最后一个线程,end = max | |
end.Set(max) | |
} else { | |
end.Set(start) | |
end.Add(end, delta) | |
} | |
// 复制一份 start, end 出来传参,Set 操作是深拷贝 | |
iStart := big.NewInt(0) | |
iEnd := big.NewInt(0) | |
iStart.Set(start) | |
iEnd.Set(end) | |
// 启动线程 | |
go count(iStart, iEnd, backChan) | |
// start = end | |
start.Set(end) | |
start.Add(start, big.NewInt(1)) | |
} | |
// 等待线程返回 | |
ratio := 0.0 | |
for i := 0; i < threads; i++ { | |
ratio = ratio + <-backChan | |
} | |
// 计算记录个数 | |
ratio = ratio / float64(threads) | |
log.Printf("%f", float64(counter)/ratio) | |
return | |
} | |
// 将 big.Int 变成 32 个字符的 16 进制字符串,不足的位数前面补 0 | |
func bigIntToHex(num *big.Int) string { | |
ret := fmt.Sprintf("%x", num) | |
digit := len(ret) | |
for i := 0; i < 32-digit; i++ { | |
ret = fmt.Sprintf("%s%s", "0", ret) | |
} | |
return ret | |
} | |
func count(iStart, iEnd *big.Int, backChan chan float64) { | |
start := bigIntToHex(iStart) | |
end := bigIntToHex(iEnd) | |
getRangeRequest := &tablestore.GetRangeRequest{} | |
rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{} | |
rangeRowQueryCriteria.TableName = tableName | |
// 添加查询范围,这里假设只有 pkHashName,和 pk | |
startPK := new(tablestore.PrimaryKey) | |
startPK.AddPrimaryKeyColumn(pkHash, start) | |
startPK.AddPrimaryKeyColumnWithMinValue(pk) | |
endPK := new(tablestore.PrimaryKey) | |
endPK.AddPrimaryKeyColumn(pkHash, end) | |
endPK.AddPrimaryKeyColumnWithMaxValue(pk) | |
rangeRowQueryCriteria.StartPrimaryKey = startPK | |
rangeRowQueryCriteria.EndPrimaryKey = endPK | |
// 其他参数 | |
rangeRowQueryCriteria.Direction = tablestore.FORWARD | |
rangeRowQueryCriteria.MaxVersion = 1 | |
rangeRowQueryCriteria.Limit = 100 | |
getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria | |
// ratio = nu / de | |
// nu = 结束值 - iStart | |
// de = iEnd - iStart | |
de := big.NewInt(0) | |
de.Set(iEnd) | |
de.Sub(de, iStart) | |
var ratioValue float64 | |
for { | |
getRangeResp, err := tsClient.GetRange(getRangeRequest) | |
// 异常情况处理 | |
if err != nil { | |
backChan <- ratioValue | |
return | |
} | |
// 查询到头的情况处理 | |
if len(getRangeResp.Rows) == 0 || getRangeResp.NextStartPrimaryKey == nil { | |
backChan <- ratioValue | |
return | |
} | |
for _, _ = range getRangeResp.Rows { | |
if true { // 换成你的条件 | |
atomic.AddUint64(&counter, 1) | |
} | |
} | |
// nu = 下个主键对应的整数 - iStart | |
nu := big.NewInt(1) | |
nu.SetString(getRangeResp.NextStartPrimaryKey.PrimaryKeys[0].Value.(string), 16) | |
nu.Sub(nu, iStart) | |
// ratio = nu / de | |
ratio := big.NewRat(1, 1) | |
ratio.SetFrac(nu, de) | |
v, _ := ratio.Float64() | |
log.Printf("ratio = %f", v) | |
if v > sampleRatio { | |
backChan <- v | |
return | |
} | |
getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment