Skip to content

Instantly share code, notes, and snippets.

@JorgenRingen
Created February 11, 2021 16:10
Show Gist options
  • Save JorgenRingen/06b19213e2364550a5af124245cafc35 to your computer and use it in GitHub Desktop.
Save JorgenRingen/06b19213e2364550a5af124245cafc35 to your computer and use it in GitHub Desktop.
Set topic-config for internal changelog-topics in kafka streams
streamsBuilder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("storeName"),
Serdes.StringSerde(),
JsonSerde<OrderV2>()
)
.withCachingEnabled()
.withLoggingEnabled(
mapOf(
RETENTION_MS_CONFIG to Duration.ofDays(30).toMillis().toString(),
CLEANUP_POLICY_CONFIG to "$CLEANUP_POLICY_COMPACT,$CLEANUP_POLICY_DELETE"
)
)
)
val materialized = Materialized.`as`<String, String, KeyValueStore<Bytes, ByteArray>>("storeName")
materialized.withLoggingEnabled(
mapOf(
RETENTION_MS_CONFIG to Duration.ofDays(30).toMillis().toString(),
CLEANUP_POLICY_CONFIG to "$CLEANUP_POLICY_COMPACT,$CLEANUP_POLICY_DELETE"
)
)
streamsBuilder.table("foo", materialized)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment