Skip to content

Instantly share code, notes, and snippets.

View sap1ens's full-sized avatar

Yaroslav Tkachenko sap1ens

View GitHub Profile
@sap1ens
sap1ens / IcebergDemo.java
Created March 21, 2025 23:35
Flink / Iceberg connecting to Confluent Tableflow via REST Catalog
package com.example.demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
@sap1ens
sap1ens / compile-perf.sh
Last active December 31, 2024 01:11
Compile perf from scratch with a specific kernel version
apt-get update
apt-get install -y git make gcc flex bison \
libunwind8-dev libdwarf-dev libelf-dev libdw-dev systemtap-sdt-dev \
libssl-dev libslang2-dev binutils-dev libzstd-dev libbabeltrace-dev \
libiberty-dev libnuma-dev libcap-dev
git clone -b v5.10.205 --single-branch -n --depth=1 --filter=tree:0 \
git://git.kernel.org/pub/scm/linux/kernel/git/stable/linux-stable.git
cd linux-stable
git sparse-checkout set --no-cone tools scripts
git checkout
@sap1ens
sap1ens / dedup.scala
Created July 17, 2023 15:54
Flink DataStream API deduplication in Scala
val dedupColumn = "..." // column name to use as a key for deduplication
val ttl = Some(Time.minutes(60)) // state TTL
stream
.keyBy(row => row.getField(dedupColumn))
.flatMap(new RichFlatMapFunction[Row, Row] {
@transient
private var seen: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = {
val records: DataStream[Record] = …
val fileSink: SinkFunction[Record] = …
records.keyBy(_.partition).addSink(fileSink)
Total: 6743323999 B
5110634379 75.8% 75.8% 5110634379 75.8% rocksdb::UncompressBlockContentsForCompressionType
1438188896 21.3% 97.1% 1438188896 21.3% os::malloc@bf9490
134350858 2.0% 99.1% 134350858 2.0% rocksdb::Arena::AllocateNewBlock
22944600 0.3% 99.4% 27545943 0.4% rocksdb::LRUCacheShard::Insert
22551264 0.3% 99.8% 5133185644 76.1% rocksdb::BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator
4732478 0.1% 99.9% 4732478 0.1% rocksdb::LRUHandleTable::Resize
2845444 0.0% 99.9% 2845444 0.0% std::string::_Rep::_S_create
1333241 0.0% 99.9% 1333241 0.0% inflate
1310779 0.0% 99.9% 1310779 0.0% rocksdb::WritableFileWriter::Append
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, RocksDBOptionsFactory}
import org.rocksdb.DBOptions
import org.rocksdb.ColumnFamilyOptions
import org.rocksdb.BlockBasedTableConfig
import org.apache.flink.configuration.ReadableConfig
import java.util
class NoBlockCacheRocksDbOptionsFactory extends ConfigurableRocksDBOptionsFactory {
override def createDBOptions(currentOptions: DBOptions, handlesToClose: util.Collection[AutoCloseable]): DBOptions = {
currentOptions
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, RocksDBOptionsFactory}
import org.rocksdb.DBOptions
import org.rocksdb.ColumnFamilyOptions
import org.rocksdb.BlockBasedTableConfig
import org.apache.flink.configuration.ReadableConfig
import java.util
class NoBlockCacheRocksDbOptionsFactory extends ConfigurableRocksDBOptionsFactory {
override def createDBOptions(currentOptions: DBOptions, handlesToClose: util.Collection[AutoCloseable]): DBOptions = {
currentOptions
docker run -d --name dogstatsd -h `hostname` -p 8125:8125/udp -e API_KEY=$API_KEY datadog/docker-dogstatsd