Created
November 7, 2020 01:25
-
-
Save loganlinn/0a434e780d29a4b13198c39d0afff67a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/olap_api/olap/clickhouse/BUILD b/olap_api/olap/clickhouse/BUILD | |
new file mode 100644 | |
index 0000000..19c4c71 | |
--- /dev/null | |
+++ b/olap_api/olap/clickhouse/BUILD | |
@@ -0,0 +1,14 @@ | |
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library") | |
+ | |
+kt_jvm_library( | |
+ name = "clickhouse", | |
+ srcs = glob(["**/*.kt"]), | |
+ visibility = ["//:__subpackages__"], | |
+ deps = [ | |
+ "@maven//:io_github_microutils_kotlin_logging", | |
+ "@maven//:io_micronaut_picocli_micronaut_picocli", | |
+ "@maven//:joda_time_joda_time", | |
+ "@maven//:org_slf4j_slf4j_api", | |
+ "@maven//:ru_yandex_clickhouse_clickhouse_jdbc", | |
+ ] | |
+) | |
\ No newline at end of file | |
diff --git a/olap_api/olap/clickhouse/tools/schemer.kt b/olap_api/olap/clickhouse/tools/schemer.kt | |
new file mode 100644 | |
index 0000000..ae4b39a | |
--- /dev/null | |
+++ b/olap_api/olap/clickhouse/tools/schemer.kt | |
@@ -0,0 +1,2 @@ | |
+package olap.clickhouse.tools | |
+ | |
diff --git a/olap_api/olap/core/BUILD b/olap_api/olap/core/BUILD | |
new file mode 100644 | |
index 0000000..dae87ca | |
--- /dev/null | |
+++ b/olap_api/olap/core/BUILD | |
@@ -0,0 +1,7 @@ | |
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library") | |
+ | |
+kt_jvm_library( | |
+ name = "core", | |
+ srcs = glob(["**/*.kt"]), | |
+ visibility = ["//:__subpackages__"], | |
+) | |
diff --git a/olap_api/olap/core/dataset.kt b/olap_api/olap/core/dataset.kt | |
new file mode 100644 | |
index 0000000..60f395c | |
--- /dev/null | |
+++ b/olap_api/olap/core/dataset.kt | |
@@ -0,0 +1,2 @@ | |
+package olap.core | |
+ | |
diff --git a/olap_api/olap/lib/serdes/avro/BUILD b/olap_api/olap/lib/serdes/avro/BUILD | |
new file mode 100644 | |
index 0000000..85e62b9 | |
--- /dev/null | |
+++ b/olap_api/olap/lib/serdes/avro/BUILD | |
@@ -0,0 +1,12 @@ | |
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library") | |
+ | |
+kt_jvm_library( | |
+ name = "avro", | |
+ srcs = glob(["*.kt"]), | |
+ visibility = ["//:__subpackages__"], | |
+ deps = [ | |
+ "@maven//:io_confluent_kafka_schema_serializer", | |
+ "@maven//:io_confluent_kafka_streams_avro_serde", | |
+ "@maven//:org_apache_avro_avro", | |
+ ] | |
+) | |
\ No newline at end of file | |
diff --git a/olap_api/olap/lib/serdes/avro/utils.kt b/olap_api/olap/lib/serdes/avro/utils.kt | |
new file mode 100644 | |
index 0000000..0186ab3 | |
--- /dev/null | |
+++ b/olap_api/olap/lib/serdes/avro/utils.kt | |
@@ -0,0 +1,8 @@ | |
+package olap.lib.serdes.avro | |
+ | |
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde | |
+import org.apache.avro.specific.SpecificRecord | |
+ | |
+fun <T : SpecificRecord> specificAvroSerde(config: Map<String, Any?>, isForKeys: Boolean) = | |
+ SpecificAvroSerde<T>().also { it.configure(config, isForKeys) } | |
+ | |
diff --git a/olap_api/olap/lib/units/BUILD b/olap_api/olap/lib/units/BUILD | |
new file mode 100644 | |
index 0000000..fe10fe1 | |
--- /dev/null | |
+++ b/olap_api/olap/lib/units/BUILD | |
@@ -0,0 +1,7 @@ | |
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library") | |
+ | |
+kt_jvm_library( | |
+ name = "units", | |
+ srcs = glob(["**/*.kt"]), | |
+ visibility = ["//:__subpackages__"], | |
+) | |
diff --git a/olap_api/olap/lib/units/information.kt b/olap_api/olap/lib/units/information.kt | |
new file mode 100644 | |
index 0000000..be33426 | |
--- /dev/null | |
+++ b/olap_api/olap/lib/units/information.kt | |
@@ -0,0 +1,39 @@ | |
+package olap.lib.units | |
+ | |
+val Long.bits: BitValue get() = BitValue(this) | |
+ | |
+val Long.bytes: BitValue get() = (this * 8).bits | |
+ | |
+val Long.kibibytes: BitValue get() = (this * 1024).bytes | |
+ | |
+val Long.mebibytes: BitValue get() = (this * 1024).kibibytes | |
+ | |
+val Long.gibibytes: BitValue get() = (this * 1024).mebibytes | |
+ | |
+val Long.tebibytes: BitValue get() = (this * 1024).gibibytes | |
+ | |
+val Long.pebibytes: BitValue get() = (this * 1024).tebibytes | |
+ | |
+val Int.bits: BitValue get() = toLong().bits | |
+ | |
+val Int.bytes: BitValue get() = toLong().bytes | |
+ | |
+val Int.kibibytes: BitValue get() = toLong().kibibytes | |
+ | |
+val Int.mebibytes: BitValue get() = toLong().mebibytes | |
+ | |
+val Int.gibibytes: BitValue get() = toLong().gibibytes | |
+ | |
+val Int.tebibytes: BitValue get() = toLong().tebibytes | |
+ | |
+val Int.pebibytes: BitValue get() = toLong().pebibytes | |
+ | |
+data class BitValue internal constructor(internal val bits: Long) { | |
+ val toBits: Long = bits | |
+ val toBytes: Long = toBits / 8 | |
+ val toKibibytes: Long = toBytes / 1024 | |
+ val toMebibytes: Long = toKibibytes / 1024 | |
+ val toGibibytes: Long = toMebibytes / 1024 | |
+ val toTebibytes: Long = toGibibytes / 1024 | |
+ val toPebibytes: Long = toTebibytes / 1024 | |
+} | |
diff --git a/olap_api/olap/lib/units/time.kt b/olap_api/olap/lib/units/time.kt | |
new file mode 100644 | |
index 0000000..582febe | |
--- /dev/null | |
+++ b/olap_api/olap/lib/units/time.kt | |
@@ -0,0 +1,39 @@ | |
+package olap.lib.units | |
+ | |
+val Long.nanoseconds: TimeValue get() = TimeValue(this) | |
+ | |
+val Long.microseconds: TimeValue get() = (this * 1000).nanoseconds | |
+ | |
+val Long.milliseconds: TimeValue get() = (this * 1000).microseconds | |
+ | |
+val Long.seconds: TimeValue get() = (this * 1000).milliseconds | |
+ | |
+val Long.minutes: TimeValue get() = (this * 60).seconds | |
+ | |
+val Long.hours: TimeValue get() = (this * 60).minutes | |
+ | |
+val Long.days: TimeValue get() = (this * 24).hours | |
+ | |
+val Int.nanoseconds: TimeValue get() = toLong().nanoseconds | |
+ | |
+val Int.microseconds: TimeValue get() = toLong().microseconds | |
+ | |
+val Int.milliseconds: TimeValue get() = toLong().milliseconds | |
+ | |
+val Int.seconds: TimeValue get() = toLong().seconds | |
+ | |
+val Int.minutes: TimeValue get() = toLong().minutes | |
+ | |
+val Int.hours: TimeValue get() = toLong().hours | |
+ | |
+val Int.days: TimeValue get() = toLong().days | |
+ | |
+data class TimeValue internal constructor(internal val ns: Long) { | |
+ val toNanoseconds = ns | |
+ val toMicroseconds = toNanoseconds / 1000 | |
+ val toMilliseconds = toMicroseconds / 1000 | |
+ val toSeconds = toMilliseconds / 1000 | |
+ val toMinutes = toSeconds / 60 | |
+ val toHours = toMinutes / 60 | |
+ val toDays = toHours / 24 | |
+} | |
\ No newline at end of file | |
diff --git a/olap_api/olap/stream/conversions_sink/BUILD b/olap_api/olap/stream/conversions_sink/BUILD | |
new file mode 100644 | |
index 0000000..8a6cf75 | |
--- /dev/null | |
+++ b/olap_api/olap/stream/conversions_sink/BUILD | |
@@ -0,0 +1,32 @@ | |
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library") | |
+load("@rules_java//java:defs.bzl", "java_binary") | |
+ | |
+package(default_visibility = ["//:__subpackages__"]) | |
+ | |
+kt_jvm_library( | |
+ name = "conversions_sink_lib", | |
+ srcs = glob(["**/*.kt"]), | |
+ visibility = ["//:__subpackages__"], | |
+ deps = [ | |
+ "//olap/lib/units", | |
+ "//olap/lib/serdes/avro", | |
+ "//olap/datasets/optimizely/v1:enriched_conversion_java_proto", | |
+ "@maven//:com_optimizely_avro_schemas_avro_schemas_export", | |
+ "@maven//:io_confluent_kafka_schema_registry_client", | |
+ "@maven//:io_confluent_kafka_schema_serializer", | |
+ "@maven//:io_confluent_kafka_streams_avro_serde", | |
+ "@maven//:io_github_microutils_kotlin_logging", | |
+ "@maven//:io_micronaut_picocli_micronaut_picocli", | |
+ "@maven//:org_apache_avro_avro", | |
+ "@maven//:org_apache_kafka_kafka_streams", | |
+ "@maven//:org_slf4j_slf4j_api", | |
+ "@maven//:ru_yandex_clickhouse_clickhouse_jdbc", | |
+ ] | |
+) | |
+ | |
+java_binary( | |
+ name = "conversions_sink", | |
+ main_class = "olap.stream.clickhouse_sink.ConversionsSink", | |
+ visibility = ["//visibility:public"], | |
+ runtime_deps = [":conversions_sink_lib"], | |
+) | |
diff --git a/olap_api/olap/stream/conversions_sink/ConversionsSink.kt b/olap_api/olap/stream/conversions_sink/ConversionsSink.kt | |
new file mode 100644 | |
index 0000000..bcd03a0 | |
--- /dev/null | |
+++ b/olap_api/olap/stream/conversions_sink/ConversionsSink.kt | |
@@ -0,0 +1,177 @@ | |
+package olap.stream.conversions_sink | |
+ | |
+import com.optimizely.export.enrich.model.EnrichedEventAvro | |
+import com.optimizely.export.enrich.model.LayerStateAvro | |
+import com.optimizely.export.enrich.model.SegmentAvro | |
+import io.confluent.kafka.serializers.subject.RecordNameStrategy | |
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde | |
+import mu.KotlinLogging | |
+import olap.datasets.conversions.v1.EnrichedConversion | |
+import olap.lib.serdes.avro.specificAvroSerde | |
+import olap.lib.units.kibibytes | |
+import olap.lib.units.mebibytes | |
+import olap.lib.units.seconds | |
+import org.apache.kafka.clients.consumer.ConsumerConfig | |
+import org.apache.kafka.clients.producer.ProducerConfig | |
+import org.apache.kafka.common.config.AbstractConfig | |
+import org.apache.kafka.common.config.ConfigDef | |
+import org.apache.kafka.common.serialization.Serdes | |
+import org.apache.kafka.streams.KafkaStreams | |
+import org.apache.kafka.streams.StreamsBuilder | |
+import org.apache.kafka.streams.StreamsConfig | |
+import org.apache.kafka.streams.Topology | |
+import org.apache.kafka.streams.kstream.Consumed | |
+import org.apache.kafka.streams.kstream.KStream | |
+import org.apache.kafka.streams.kstream.ValueMapper | |
+import picocli.CommandLine | |
+import picocli.CommandLine.Command | |
+import picocli.CommandLine.Parameters | |
+import java.nio.file.Path | |
+import java.util.* | |
+import java.util.concurrent.Callable | |
+import java.util.function.UnaryOperator | |
+import kotlin.system.exitProcess | |
+ | |
+private val logger = KotlinLogging.logger {} | |
+ | |
+const val APPLICATION_NAME = "clickhouse-sink" | |
+const val APPLICATION_VERSION = "1" | |
+const val APPLICATION_ID = "$APPLICATION_NAME-$APPLICATION_VERSION" | |
+ | |
+private val CONFIG_DEFAULTS = mapOf( | |
+ StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String()::class.java, | |
+ StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to SpecificAvroSerde::class.java, | |
+ "schema.registry.url" to "http://schema-registry", | |
+ "key.subject.name.strategy" to RecordNameStrategy::class.java, | |
+ "value.subject.name.strategy" to RecordNameStrategy::class.java, | |
+ "auto.register.schemas" to false, | |
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", | |
+ ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 2.mebibytes.toBytes, | |
+ ConsumerConfig.FETCH_MAX_BYTES_CONFIG to 100.mebibytes.toBytes, | |
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 10.seconds.toMilliseconds, | |
+ ConsumerConfig.RECEIVE_BUFFER_CONFIG to 8.mebibytes.toBytes, | |
+ ConsumerConfig.SEND_BUFFER_CONFIG to 8.mebibytes.toBytes, | |
+ ProducerConfig.BATCH_SIZE_CONFIG to 512.kibibytes.toBytes, | |
+ ProducerConfig.MAX_REQUEST_SIZE_CONFIG to 6.mebibytes.toBytes, | |
+ ProducerConfig.BUFFER_MEMORY_CONFIG to 128.mebibytes.toBytes | |
+) | |
+ | |
+ | |
+@Command( | |
+ name = APPLICATION_NAME, | |
+ mixinStandardHelpOptions = true, | |
+ version = [APPLICATION_VERSION] | |
+) | |
+class ConversionsSink : Callable<Int> { | |
+ private val props: Properties = Properties() | |
+ | |
+ @Parameters(index = "0", description = ["path to configuration .properties file."]) | |
+ lateinit var configPath: Path | |
+ | |
+ @CommandLine.Option( | |
+ names = ["--boostrap-servers"], | |
+ description = ["The server(s) to connect to."] | |
+ ) | |
+ var bootstrapServers: String? = null | |
+ | |
+ override fun call(): Int { | |
+ props.putAll(CONFIG_DEFAULTS) | |
+ props.load(configPath.toFile().inputStream()) | |
+ props[StreamsConfig.APPLICATION_ID_CONFIG] = APPLICATION_ID | |
+ if (bootstrapServers != null) { | |
+ props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] | |
+ } | |
+ | |
+ val config = ConversionsSinkConfig(props) | |
+ logger.info { "config $config" } | |
+ | |
+ val topology: Topology = StreamsBuilder().apply { | |
+ | |
+ val e3Stream: KStream<String, EnrichedEventAvro> = | |
+ stream( | |
+ config.enrichedConversionsTopic, | |
+ Consumed.with( | |
+ Serdes.String(), | |
+ specificAvroSerde<EnrichedEventAvro>( | |
+ props.stringPropertyNames() | |
+ .associateWith { props[it] }, false | |
+ ) | |
+ ) | |
+ ) | |
+ | |
+ val olapStream: KStream<String, EnrichedConversion> = | |
+ e3Stream.mapValues(EnrichedConversionMapper()) | |
+ | |
+ | |
+ }.build() | |
+ | |
+ val streams = KafkaStreams(topology, props) | |
+ | |
+ streams.start() | |
+ | |
+ Runtime.getRuntime().addShutdownHook(Thread { streams.close() }) | |
+ | |
+ return 0 | |
+ } | |
+ | |
+ companion object { | |
+ fun cli() = CommandLine(ConversionsSink()) | |
+ | |
+ @JvmStatic | |
+ fun main(args: Array<String>): Unit { | |
+ for (field in EnrichedConversion.getDescriptor().fields) { | |
+ logger.info("field descriptor=$field options = ${field.options}") | |
+ } | |
+ | |
+ exitProcess(cli().execute(*args)) | |
+ } | |
+ } | |
+} | |
+ | |
+internal class ConversionsSinkConfig(originals: Map<Any, Any>) : | |
+ AbstractConfig(CONFIG_DEF, originals) { | |
+ val enrichedConversionsTopic: String get() = getString(TOPIC_ENRICHED_CONVERSIONS_CONFIG) | |
+ val olapEventsTopic: String get() = getString(TOPIC_OLAP_EVENT_CONFIG) | |
+ | |
+ companion object { | |
+ const val TOPIC_ENRICHED_CONVERSIONS_CONFIG = "topic.enriched_conversions" | |
+ const val TOPIC_OLAP_EVENT_CONFIG = "topic.olap_events" | |
+ | |
+ val CONFIG_DEF: ConfigDef = ConfigDef() | |
+ .define( | |
+ TOPIC_ENRICHED_CONVERSIONS_CONFIG, | |
+ ConfigDef.Type.STRING, | |
+ null, | |
+ ConfigDef.NonEmptyString(), | |
+ ConfigDef.Importance.HIGH, | |
+ "name of topic to consume E3 conversion events from" | |
+ )!! | |
+ .define( | |
+ TOPIC_OLAP_EVENT_CONFIG, | |
+ ConfigDef.Type.STRING, | |
+ null, | |
+ ConfigDef.NonEmptyString(), | |
+ ConfigDef.Importance.HIGH, | |
+ "name of topic to produce OLAP events to" | |
+ )!! | |
+ } | |
+} | |
+ | |
+internal class EnrichedConversionMapper( | |
+ val experimentsMapper: (EnrichedEventAvro) -> List<LayerStateAvro> = EnrichedEventAvro::getExperiments, | |
+ val attributesMapper: (EnrichedEventAvro) -> List<SegmentAvro> = EnrichedEventAvro::getAttributes, | |
+ val tagsMapper: (EnrichedEventAvro) -> Map<CharSequence, CharSequence> = EnrichedEventAvro::getTags | |
+) : ValueMapper<EnrichedEventAvro, EnrichedConversion> { | |
+ override fun apply(value: EnrichedEventAvro): EnrichedConversion { | |
+ val builder = EnrichedConversion.newBuilder() | |
+ | |
+ val experiments = experimentsMapper(value) | |
+ val attributes = attributesMapper(value) | |
+ val tags = tagsMapper(value) | |
+ | |
+ TODO() | |
+ | |
+ return builder.build() | |
+ } | |
+} | |
+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment