Last active
August 16, 2021 21:29
-
-
Save GrigorievNick/535c0e13f148f34b7bb652f31d681609 to your computer and use it in GitHub Desktop.
IcebergRead from executor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.collect.Maps | |
import org.apache.iceberg.TableProperties | |
import org.apache.iceberg.data.IcebergGenerics | |
import org.apache.iceberg.expressions.Expressions | |
import org.apache.iceberg.hadoop.HadoopTables | |
import org.apache.iceberg.spark.SparkSchemaUtil.convert | |
import org.apache.spark.TaskContext | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.SaveMode | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.catalyst.encoders.RowEncoder | |
import org.apache.spark.sql.functions.col | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.DoNotDiscover | |
import org.scalatest.FunSuite | |
import java.sql.Timestamp.valueOf | |
import java.time.LocalDateTime | |
import scala.collection.JavaConverters._ | |
import scala.reflect.io.Directory | |
import scala.reflect.io.Path | |
@DoNotDiscover | |
class IcebergMergeOnRead extends FunSuite with BeforeAndAfterAll { | |
private val testPath = "/tmp/iceberg_cdc_test" | |
private val icebergTable = "hdl.enrichment_table" | |
private val icebergCatalogPath = s"$testPath/iceberg_catalog" | |
private val dataPath = s"$testPath/datalake" | |
private val timestamp: LocalDateTime = LocalDateTime.now().minusDays(1) | |
val sparkSession: SparkSession = SparkSession.builder() | |
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions ") | |
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") | |
.config("spark.sql.catalog.spark_catalog.type", "hadoop") | |
.config("spark.sql.catalog.spark_catalog.warehouse", icebergCatalogPath) | |
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") | |
.master("local") | |
.getOrCreate() | |
import sparkSession.implicits._ | |
override def beforeAll(): Unit = { | |
sparkSession.sql(s"CREATE NAMESPACE IF NOT EXISTS hdl") | |
sparkSession.sql(s"DROP TABLE IF EXISTS $icebergTable") // DROP do not work | |
Directory(Path(testPath)).deleteRecursively() | |
} | |
test("Create iceberg table and parquet to merge") { | |
val icebergEnricherData = ((10 until 30) ++ (40 until 90)).map(id => (id, s"dataIceberg$id", valueOf(timestamp))) | |
val dataToEnrich = (0 until 100).map(id => (id, "data", valueOf(timestamp))) | |
sparkSession.sql(s"""CREATE TABLE $icebergTable ( | |
| id bigint, | |
| icebergData string, | |
| icebergTs timestamp) | |
|USING iceberg | |
|PARTITIONED BY (truncate(10,id), days(icebergTs)) | |
|TBLPROPERTIES ( | |
|'${TableProperties.FORMAT_VERSION}'='2', | |
|'${TableProperties.OBJECT_STORE_ENABLED}'=true, | |
|'${TableProperties.OBJECT_STORE_PATH}'='$testPath/iceberg_data/' | |
|)""".stripMargin) | |
icebergEnricherData | |
.toDF("id", "icebergData", "icebergTs") | |
.sortWithinPartitions($"id", $"icebergTs") | |
.writeTo(icebergTable) | |
.append() | |
dataToEnrich | |
.toDF("id", "data", "ts") | |
.repartition(4, col("id")) // This is the problem | |
.write | |
.format("delta") | |
.mode(SaveMode.Overwrite) | |
.save(dataPath) | |
sparkSession.read.format("iceberg").load(icebergTable).show(numRows = 100) | |
} | |
test("Read iceberg table from executor") { | |
val tables = new HadoopTables(sparkSession.sessionState.newHadoopConf()) | |
val table = tables.load(s"$icebergCatalogPath/${icebergTable.replace('.', '/')}") | |
val enrichmentColumns = List("icebergData") | |
val enrichmentColumnSchema = table.schema().select(enrichmentColumns: _*) | |
val inputDF = sparkSession | |
.read | |
.format("delta") | |
.load(dataPath) | |
val rowEncoder = RowEncoder(convert(enrichmentColumnSchema).fields.foldLeft(inputDF.schema)((s, f) => s.add(f))) | |
val enricherData = inputDF | |
.sort("id") // main issue to avoid this sort and still do not have too many ranges | |
.coalesce(4) | |
.mapPartitions { it: Iterator[Row] => | |
// hash right inner join | |
it | |
.sliceConsecutive(_.getAs[Integer]("id").toLong) | |
.flatMap { range => | |
val rowToLong: com.google.common.base.Function[Row, Long] = | |
(r: Row) => r.getAs[Integer]("id").toLong | |
val hashedRows = Maps.uniqueIndex(range.asJava, rowToLong) | |
val endBorder = hashedRows.keySet().asScala.max | |
val startBorder = hashedRows.keySet().asScala.min | |
val scanTasks = IcebergGenerics.read(table) | |
.select(enrichmentColumns: _*) | |
.where(Expressions.and( | |
Expressions.greaterThanOrEqual("id", startBorder), | |
Expressions.lessThan("id", endBorder) | |
)).build() | |
try scanTasks | |
.asScala | |
.map { record => | |
val inputRow = hashedRows.get(record.getField("id")) | |
Row.fromSeq(inputRow.toSeq ++ enrichmentColumns.map(record.getField)) // TODO type converter | |
} finally scanTasks.close() | |
} | |
}(rowEncoder) | |
enricherData.show(numRows = 100, truncate = false) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit class SliceBySubsequence[T, K](it: Iterator[T]) extends Serializable { | |
/** | |
* @param keyExtractor – The function that used to extract key from iterator entry. | |
* @return - An iterator of subsequences(Iterator) with same key. | |
* | |
* Note: Reuse: After calling this method, one should discard the iterator it was called on, | |
* and use only the iterator that was returned. Using the old iterator is undefined, subject to change, | |
* and may result in changes to the new iterator as well. | |
*/ | |
def sliceBy(keyExtractor: T => K): Iterator[Iterator[T]] = new AbstractIterator[Iterator[T]] { | |
private var bufferedIt = it.buffered | |
def hasNext: Boolean = bufferedIt.hasNext | |
def next(): Iterator[T] = | |
bufferedIt.headOption match { | |
case Some(hd) => | |
val (subsequence, rest) = bufferedIt.span(r => keyExtractor(r) == keyExtractor(hd)) | |
bufferedIt = rest.buffered | |
subsequence | |
case None => | |
Iterator.empty | |
} | |
} | |
/** | |
* @param extractSequentialId – The function that used to extract sequential id from iterator entry. | |
* @return - An iterator of subsequences(Iterator) slice to consecutive id sequence(Range). | |
* | |
* Note: Reuse: After calling this method, one should discard the iterator it was called on, | |
* and use only the iterator that was returned. Using the old iterator is undefined, subject to change, | |
* and may result in changes to the new iterator as well. | |
*/ | |
def sliceConsecutive(extractSequentialId: T => Long): Iterator[Iterator[T]] = new AbstractIterator[Iterator[T]] { | |
var iteratorToSlice: Iterator[T] = it | |
override def hasNext: Boolean = iteratorToSlice.hasNext | |
override def next(): Iterator[T] = { | |
var nextIdInSequence: Option[Long] = None | |
val (consecutiveSubSequence, rest) = iteratorToSlice.span { row => | |
val isSequential = extractSequentialId(row) == nextIdInSequence.getOrElse(extractSequentialId(row)) | |
if (isSequential) nextIdInSequence = Some(extractSequentialId(row) + 1) | |
else nextIdInSequence = None | |
isSequential | |
} | |
iteratorToSlice = rest | |
consecutiveSubSequence | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment