Skip to content

Instantly share code, notes, and snippets.

@Samrose-Ahmed
Created May 6, 2023 03:15
Show Gist options
  • Save Samrose-Ahmed/8587b27d39bf22337426f2bc2840c5e9 to your computer and use it in GitHub Desktop.
Save Samrose-Ahmed/8587b27d39bf22337426f2bc2840c5e9 to your computer and use it in GitHub Desktop.
test123
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.Schema
import org.apache.iceberg.SchemaParser
import org.apache.iceberg.aws.glue.GlueCatalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.actions.SparkActions
object GlueApp {
val bucket = "matanodpcommonstack-matanolakestoragebucketf56b77-clgql6xfvs3r"
val icebergProperties = Map(
"catalog-name" -> "iceberg",
"catalog-impl" -> "org.apache.iceberg.aws.glue.GlueCatalog",
"warehouse" -> s"s3://$bucket/lake",
"io-impl" -> "org.apache.iceberg.aws.s3.S3FileIO"
)
val icebergCatalog = {
val ret = new GlueCatalog()
ret.initialize("glue_catalog", icebergProperties.asJava)
ret
}
def main(sysArgs: Array[String]) {
val conf = new SparkConf()
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", s"$bucket/lake")
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
val spark: SparkContext = new SparkContext(conf)
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession
import sparkSession.implicits._
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val tableName = "aws_cloudtrail"
val tableId = TableIdentifier.of(Namespace.of("matano"), tableName)
val table = icebergCatalog.loadTable(tableId)
val expireTime = System.currentTimeMillis() - 500 * 1000
SparkActions
.get(sparkSession)
.expireSnapshots(table)
.expireOlderThan(expireTime)
.execute();
SparkActions
.get(sparkSession)
.deleteOrphanFiles(table)
.olderThan(expireTime)
.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment