-
-
Save mwiewior/231bef07c18b37037eb16642a4be9bda to your computer and use it in GitHub Desktop.
Carbon data varia
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ./spark-shell -v --master yarn-client --driver-memory 1G --executor-memory 2G --executor-cores 2 \ | |
// --jars /tmp/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.2-hadoop2.7.2.jar \ | |
// --conf spark.hadoop.hive.metastore.uris=thrift://cdh01.cl.ii.pw.edu.pl:9083 \ | |
// --conf spark.hadoop.yarn.timeline-service.enabled=false \ | |
// --conf spark.driver.extraJavaOptions=-Dhdp.version=3.1.0.0-78 \ | |
// --conf spark.yarn.am.extraJavaOptions=-Dhdp.version=3.1.0.0-78 \ | |
// --conf spark.hadoop.metastore.catalog.default=hive | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.CarbonSession._ | |
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://cdh01.cl.ii.pw.edu.pl:8020/opt/carbonstore") | |
carbon.sql("use igap_dev") | |
carbon.sql( | |
s""" | |
| CREATE TABLE etl_genotypes_ac_carbon | |
| STORED AS carbondata | |
| AS SELECT * FROM etg_genotypes_ac_ext | |
""".stripMargin) | |
carbon.sql( | |
s""" | |
| CREATE DATAMAP agg_alleles | |
| ON TABLE etl_genotypes_ac_carbon | |
| USING "preaggregate" | |
| AS | |
| SELECT chr, pos, posend, ref, sum(al_cnt) as sum_al | |
| FROM etl_genotypes_ac_carbon | |
| GROUP BY chr, pos, posend, ref, alt | |
""".stripMargin) | |
//"opTime":"272850 ms" | |
//distinct not supported in preagg | |
// carbon.sql( | |
// s""" | |
// | CREATE DATAMAP agg_samples | |
// | ON TABLE etl_genotypes_ac_carbon | |
// | USING "preaggregate" | |
// | AS | |
// | SELECT count (distinct sample_id) as sample_count | |
// | FROM etl_genotypes_ac_carbon | |
// """.stripMargin) | |
carbon.sql( | |
s""" | |
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af | |
| FROM | |
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles | |
""".stripMargin).explain | |
// +- *(1) FileScan carbondata igap_dev.etl_genotypes_ac_carbon_agg_alleles[etl_genotypes_ac_carbon_chr#2161,etl_genotypes_ac_carbon_pos#2162,etl_genotypes_ac_carbon_posend#2163,etl_genotypes_ac_carbon_ref#2164,etl_genotypes_ac_carbon_al_cnt_sum#2165L,etl_genotypes_ac_carbon_alt#2166] ReadSchema: struct<etl_genotypes_ac_carbon_al_cnt_sum:bigint,etl_genotypes_ac_carbon_alt:string,etl_genotypes... | |
carbon.sql( | |
s""" | |
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af | |
| FROM | |
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles | |
""".stripMargin).count | |
spark.time(carbon.sql( | |
s""" | |
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af | |
| FROM | |
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles | |
""".stripMargin).count ) | |
// Time taken: 14620 ms = ~14s | |
// res25: Long = 22064623 | |
spark.time(carbon.sql( | |
s""" | |
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af | |
| FROM | |
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles | |
| WHERE CHR='1' AND POS=13483 AND POSEND=13483 | |
""".stripMargin).show ) | |
+---+-----+------+---+---+--------------------+ | |
|chr| pos|posend|ref|alt| af| | |
+---+-----+------+---+---+--------------------+ | |
| 1|13483| 13483| G| C|0.006329113924050633| | |
+---+-----+------+---+---+--------------------+ | |
Time taken: 14046 ms | |
carbon.sql( | |
s""" | |
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af | |
| FROM | |
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles | |
| WHERE CHR='1' AND POS=13483 AND POSEND=13483 | |
""".stripMargin).explain | |
== Physical Plan == | |
*(2) HashAggregate(keys=[etl_genotypes_ac_carbon_chr#2161, etl_genotypes_ac_carbon_pos#2162, etl_genotypes_ac_carbon_posend#2163, etl_genotypes_ac_carbon_ref#2164, etl_genotypes_ac_carbon_alt#2166], functions=[sum(etl_genotypes_ac_carbon_al_cnt_sum#2165L)]) | |
: +- Subquery subquery2685 | |
: +- *(3) HashAggregate(keys=[], functions=[count(distinct sample_id#2039)]) | |
: +- Exchange SinglePartition | |
: +- *(2) HashAggregate(keys=[], functions=[partial_count(distinct sample_id#2039)]) | |
: +- *(2) HashAggregate(keys=[sample_id#2039], functions=[]) | |
: +- Exchange hashpartitioning(sample_id#2039, 200) | |
: +- *(1) HashAggregate(keys=[sample_id#2039], functions=[]) | |
: +- *(1) FileScan carbondata igap_dev.etl_genotypes_ac_carbon[sample_id#2039] ReadSchema: struct<sample_id:string> | |
+- Exchange hashpartitioning(etl_genotypes_ac_carbon_chr#2161, etl_genotypes_ac_carbon_pos#2162, etl_genotypes_ac_carbon_posend#2163, etl_genotypes_ac_carbon_ref#2164, etl_genotypes_ac_carbon_alt#2166, 200) | |
+- *(1) HashAggregate(keys=[etl_genotypes_ac_carbon_chr#2161, etl_genotypes_ac_carbon_pos#2162, etl_genotypes_ac_carbon_posend#2163, etl_genotypes_ac_carbon_ref#2164, etl_genotypes_ac_carbon_alt#2166], functions=[partial_sum(etl_genotypes_ac_carbon_al_cnt_sum#2165L)]) | |
+- *(1) Filter (((((isnotnull(etl_genotypes_ac_carbon_posend#2163) && isnotnull(etl_genotypes_ac_carbon_pos#2162)) && isnotnull(etl_genotypes_ac_carbon_chr#2161)) && (etl_genotypes_ac_carbon_chr#2161 = 1)) && (etl_genotypes_ac_carbon_pos#2162 = 13483)) && (etl_genotypes_ac_carbon_posend#2163 = 13483)) | |
+- *(1) FileScan carbondata igap_dev.etl_genotypes_ac_carbon_agg_alleles[etl_genotypes_ac_carbon_chr#2161,etl_genotypes_ac_carbon_pos#2162,etl_genotypes_ac_carbon_posend#2163,etl_genotypes_ac_carbon_ref#2164,etl_genotypes_ac_carbon_al_cnt_sum#2165L,etl_genotypes_ac_carbon_alt#2166] PushedFilters: [IsNotNull(etl_genotypes_ac_carbon_posend), IsNotNull(etl_genotypes_ac_carbon_pos), IsNotNull(etl..., ReadSchema: struct<etl_genotypes_ac_carbon_al_cnt_sum:bigint,etl_genotypes_ac_carbon_alt:string,etl_genotypes... | |
sql("use igap_dev") | |
spark.time(sql( | |
s""" | |
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etg_genotypes_ac_ext) as af | |
| FROM | |
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etg_genotypes_ac_ext group by chr, pos, posend, ref, alt) alleles | |
""".stripMargin).count ) | |
// Time taken: 337621 ms = ~5,5m | |
// res26: Long = 22064623 | |
// in beeline | |
// select count (*) from (SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etg_genotypes_ac) as af | |
// FROM | |
// (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etg_genotypes_ac group by chr, pos, posend, ref, alt) alleles); | |
// > 5,5 m | |
// create external table etg_genotypes_ac_ext as select *, case gt | |
// when ref||'|'||ref then 0 | |
// when ref||'|'||alt then 1 | |
// when alt||'|'||ref then 1 | |
// when alt||'|'||alt then 2 | |
// when ref then 0 | |
// when alt then 1 | |
// else 0 | |
// end as al_cnt from etl_genotypes; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment