Skip to content

Instantly share code, notes, and snippets.

View sap1ens's full-sized avatar

Yaroslav Tkachenko sap1ens

View GitHub Profile
val records: DataStream[Record] = …
val fileSink: SinkFunction[Record] = …
records.keyBy(_.partition).addSink(fileSink)
@sap1ens
sap1ens / dedup.scala
Created July 17, 2023 15:54
Flink DataStream API deduplication in Scala
val dedupColumn = "..." // column name to use as a key for deduplication
val ttl = Some(Time.minutes(60)) // state TTL
stream
.keyBy(row => row.getField(dedupColumn))
.flatMap(new RichFlatMapFunction[Row, Row] {
@transient
private var seen: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = {
@sap1ens
sap1ens / compile-perf.sh
Last active December 31, 2024 01:11
Compile perf from scratch with a specific kernel version
apt-get update
apt-get install -y git make gcc flex bison \
libunwind8-dev libdwarf-dev libelf-dev libdw-dev systemtap-sdt-dev \
libssl-dev libslang2-dev binutils-dev libzstd-dev libbabeltrace-dev \
libiberty-dev libnuma-dev libcap-dev
git clone -b v5.10.205 --single-branch -n --depth=1 --filter=tree:0 \
git://git.kernel.org/pub/scm/linux/kernel/git/stable/linux-stable.git
cd linux-stable
git sparse-checkout set --no-cone tools scripts
git checkout
@sap1ens
sap1ens / IcebergDemo.java
Created March 21, 2025 23:35
Flink / Iceberg connecting to Confluent Tableflow via REST Catalog
package com.example.demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;