- 对推送保存记录表
push_app_taskpush_device_task的查询:- 主KEY上的搜索(partitionKey和带上clusterKey的区别)
- 索引上的搜索(单个和多个的区别,单个索引上少值和多值的区别)
- 混合主Key和Index上的搜索
- 表上的聚集操作(计算
count(*))和 迭代累加计数
- 对Device和APP模块公用表
deviceapp_[new_]info查询
drop table if exists cloud.push_app_task;
create table cloud.push_app_task (
account_id bigint,
msg_id text,
app_key text,
expiry int,
method text,
msg_type text,
oa_task_id text,
oa_task_type text,
payload text,
retries int,
status_code int,
time timestamp,
PRIMARY KEY (account_id, msg_id)
);
create index push_app_task_msg_type_idx on cloud.push_app_task(msg_type);
create index push_app_task_status_code_idx on cloud.push_app_task(status_code);
create index push_app_task_oa_task_type_idx on cloud.push_app_task(oa_task_type);
create index push_app_task_oa_task_id_idx on cloud.push_app_task(oa_task_id);| 数据 | 100w | 1000w |
|---|---|---|
| account_id | 10000个*100 | 1000个*10000 10000个*1000 |
| oa_task_type | appRelease、pluginRelease、deviceMessage (1:1:8) | |
| oa_task_id | 10、100个 | |
| status_code | 0、1 |
select * from cloud.push_app_task where account_id = ?
select * from cloud.push_app_task where account_id = ? and msg_id = ?
select * from cloud.push_app_task where status_code = ?
select * from cloud.push_app_task where oa_task_id = ?
select * from cloud.push_app_task where oa_task_type = ? and oa_task_id = ?
select * from cloud.push_app_task where oa_task_type = ? and oa_task_id = ? and status_code = ?
select * from cloud.push_app_task where account_id = ? and status_code = ?
select * from cloud.push_app_task
select count(*) from cloud.push_app_task
select * from cloud.push_app_task where status_code = ?
select count(*) from cloud.push_app_task where status_code = ?120上
生成 10000*100 100W数据时候, cassandra cpu 131% mem 22.5%
执行时, cpu 110% mem 22.5%
安静时, cpu 3.3% mem 22.8%
现象: 在partitionKey上的搜索和clusterKey上
分析:
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select * from cloud.push_app_task where account_id = 2' > /dev/null;
real 0m0.662s
user 0m0.518s
sys 0m0.113s
# java
cnt(rows): 190
time: 12
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e "select * from cloud.push_app_task where account_id = 709 and msg_id = '1438067012824_um4oxMxp'" > /dev/null;
real 0m0.547s
user 0m0.413s
sys 0m0.086s
# java
cnt(rows): 1
time: 3
现象:索引上的搜索
分析:
查询时延和返回的结果集有关,结果集越大,时延越高;多索引上搜索的时候,时延由单独索引搜索最低的决定;查询时延可能和索引类型有关{好像无关} 待验证;
查询时延与列基数有关,与cassandra选择的最佳过滤条件有关,如果选择的最佳列是个备选条目很大的列,则时延受限,目前选择Best IndexExpression的策略是值的枚举数最大的索引列。
select * from cloud.push_app_task;
cnt(rows): 1000000
time: 20258
select * from cloud.push_app_task where status_code = 1;
cnt(rows): 865468
time: 185134
select * from cloud.push_app_task where status_code = 0;
cnt(rows): 134532
time: 29388
cnt(rows): 134532
time: 26330
select * from cloud.push_app_task where oa_task_id = '1';
cnt(rows): 11000
time: 2274
select * from cloud.push_app_task where oa_task_id = '11';
cnt(rows): 1000
time: 301
# ##### 以上对比 #################
# 查询时延和返回的结果集有关,结果集越大,时延越高;
select * from cloud.push_app_task where oa_task_type = 'appRelease';
cnt(rows): 100000
time: 21473
select * from cloud.push_app_task where oa_task_id = '1' allow filtering;
cnt(rows): 11000
time: 2522
select * from cloud.push_app_task where oa_task_type = 'appRelease' and oa_task_id = '1' allow filtering;
cnt(rows): 10000
time: 2479
select * from cloud.push_app_task where oa_task_type = 'appRelease' and status_code = 0 allow filtering;
cnt(rows): 10000
time: 20085
select * from cloud.push_app_task where msg_type = 'newApp';
cnt(rows): 100000
time: 19734
cnt(rows): 100000
time: 21572
select * from cloud.push_app_task where msg_type = 'newApp' and status_code = 0 allow filtering;
cnt(rows): 10000
time: 17427
select * from cloud.push_app_task where msg_type = 'newApp' and status_code = 1 allow filtering;
cnt(rows): 90000
time: 19694
cnt(rows): 90000
time: 21711
select * from cloud.push_app_task where msg_type = 'deviceMessage' and status_code=0 allow filtering;
cnt(rows): 100000
time: 153425
select * from cloud.push_app_task where msg_type = 'deviceMessage' and status_code=1 allow filtering;
cnt(rows): 700000
time: 154768
select * from cloud.push_app_task where msg_type = 'deviceMessage' ;
cnt(rows): 800000
time: 161218
select * from cloud.push_app_task where method = 'pushEvent'; # 临时添加索引列
cnt(rows): 1000000
time: 192033
select * from cloud.push_app_task where method = 'pushEvent' and msg_type = 'newApp' allow filtering;
cnt(rows): 100000
time: 20706
select * from cloud.push_app_task where method = 'pushEvent' and status_code = 0 allow filtering;
cnt(rows): 134532
time: 29842
select * from cloud.push_app_task where method = 'pushEvent' and status_code = 1 allow filtering;
cnt(rows): 865468
time: 170509
select * from cloud.push_app_task where method = 'pushEvent' and msg_type = 'deviceMessage' allow filtering;
cnt(rows): 800000
time: 180373
select * from push_app_task where method = 'pushEvent' and msg_type = 'deviceMessage' and status_code = 0 allow filtering;
cnt(rows): 100000
time: 146193
现象: count(*) 操作240,000行数据就会超时 10s,在cassandra中占了一个线程;在partitionKey上做count(*)则很快;在有返回少量结果的查询条件下,速度也很快。
分析: count(*) 应该是先选择出符合条件的结果,再在结果集上累加计数,时延=查询时延+遍历返回的结果集,且计数的负载由cassandra承担,占一个核。待验证
详细:
当下的CQLSH 速度不及java-driver
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select count(*) from cloud.push_app_task limit 210000' > /dev/null;
real 0m9.833s #java 3511ms
user 0m0.704s
sys 0m0.144s
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select count(*) from cloud.push_app_task limit 220000' > /dev/null;
<stdin>:1:OperationTimedOut: errors={}, last_host=172.29.88.120
real 0m10.520s #java 3682ms
user 0m0.732s
sys 0m0.148s
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select * from cloud.push_app_task limit 10000' > /dev/null;
real 0m7.569s
user 0m6.655s
sys 0m0.808s
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select count(*) from cloud.push_app_task limit 10000' > /dev/null;
real 0m1.038s
user 0m0.387s
sys 0m0.105s
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select count(*) from cloud.push_app_task limit 100000' > /dev/null;
real 0m5.545s
user 0m0.545s
sys 0m0.123s
[cloud@localhost apache-cassandra-2.1.5]$ time bin/cqlsh -e 'select * from cloud.push_app_task limit 100000' > /dev/null;
real 1m13.051s
user 1m3.802s
sys 0m8.192s
执行查询过程中会出现异常: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency ONE (1 responses were required but only 0 replica responded)
在频繁更新或删除的列上使用索引的危害:
Cassandra把”墓碑”存储到索引上,直到”墓碑”的限制达到100K个单元。当超过”墓碑”的限制,使用已经索引的查询将失败。
什么时候不使用索引
- 不要在基数很大的列使用,因为需要在大量的值中查询小部分的数据。
- 在使用counter的列不能使用索引。
- 在平凡更新或删除的列。
- 除非精确查询,不要在一个大的分区中查询一行。
新插入 INSERT INTO push_app_task (account_id, msg_id , method, msg_type, payload, status_code) VALUES ( 999001,'000_001','push','deviceMessage','test push xx11', 0);
查询会出现如下异常。
select * from cloud.push_app_task where method = 'push' and msg_type = 'deviceMessage' allow filtering;
com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency ONE (1 responses were required but only 0 replica responded)
在测试pushservice模块cassandra 查询执行速度时,发现带多个索引即加(ALLOW FILTERING关键字)的cql 查询执行速度受 列值基数大的索引 查询速度影响。
执行CQL结果: cnt(rows) 表示返回行数, time(ms) 耗费时间毫秒
select * from cloud.push_app_task where msg_type = 'deviceMessage' ;
cnt(rows): 800000
time(ms): 161,218
select * from cloud.push_app_task where status_code = 0;
cnt(rows): 134532
time(ms): 29,388
select * from cloud.push_app_task where msg_type = 'deviceMessage' and status_code=0 allow filtering;
cnt(rows): 100000
time(ms): 153,425发现
在 msg_type和status_code 两个索引上速度受 msg_type索引 的速度影响。
其中msg_type的值基数为3 即 80% 的是 msg_type=“deviceMessage”, 10% 的 ”newApp”, 10%的 “newPlugIn”,status_code的基数值为2 即 0 和 1
为了验证 双索引查询速度受 列值基数大的索引 影响。 我插入两条数据来增加 status_code的 基数到4,比msg_type基数3 要大了。
cqlsh:cloud> INSERT INTO push_app_task (account_id, msg_id , method, msg_type, payload, status_code) VALUES ( 999001,'000_001','pushEvent','deviceMessage','test index ....', 3);
cqlsh:cloud> INSERT INTO push_app_task (account_id, msg_id , method, msg_type, payload, status_code) VALUES ( 999001,'000_002','pushEvent','deviceMessage','test index ....', 4);执行 bin/nodetool rebuild_index cloud push_app_task push_app_task_status_code_idx 重建 status_code上的索引
再执行一遍
select * from cloud.push_app_task where msg_type = 'deviceMessage' and status_code=0 allow filtering;
cnt(rows): 100000
time(ms): 30,472发现此时和预想一样,速度接近status_code索引上的速度。
建议
在过滤条件含多索引条件时,对于自己能预估到基数较大的索引列上效率不好的话,可以将此过滤条件在client端程序逻辑中迭代过滤实现。比如:
我的这个查询,预估到msg_type索引列上基数较大,但status_code索引查询效率要高,我就只执行 select * from cloud.push_app_task where status_code = 0;
获得结果集上对Rows遍历过滤 msg_type,速度要快很多。cnt(rows): 100000 time(ms): 33043 接近 status_code索引上速度。
究根问底
在cassandra源码中,找到这种策略的代码:
// org.apache.cassandra.db.index.keys.KeysSearcher
public List<Row> search(ExtendedFilter filter)
{
assert filter.getClause() != null && !filter.getClause().isEmpty();
final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
// TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
// as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
{
return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
}
}
// org.apache.cassandra.db.index.SecondaryIndexSearcher
protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
{
IndexExpression best = null;
int bestMeanCount = Integer.MAX_VALUE;
Map<SecondaryIndex, Integer> candidates = new HashMap<>();
for (IndexExpression expression : clause)
{
// skip columns belonging to a different index type
if (!columns.contains(expression.column))
continue;
SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator))
continue;
int columns = index.getIndexCfs().getMeanColumns();
candidates.put(index, columns);
if (columns < bestMeanCount)
{
best = expression;
bestMeanCount = columns;
}
}
if (best == null)
Tracing.trace("No applicable indexes found");
else
Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.",
FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column).getIndexName());
return best;
}
// org.apache.cassandra.db.DataTracker
public int getMeanColumns()
{
long sum = 0;
long count = 0;
for (SSTableReader sstable : getSSTables())
{
long n = sstable.getEstimatedColumnCount().count();
sum += sstable.getEstimatedColumnCount().mean() * n;
count += n;
}
return count > 0 ? (int) (sum / count) : 0;
}