Created
March 12, 2022 00:16
-
-
Save mtraynham/a4d4742e1ba712590e6b7c346b8d37bc to your computer and use it in GitHub Desktop.
Kafka Connect Postgres JSONB Dialect (Kafka 2.2.0)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.project.kafka.connect.jdbc.dialect | |
import com.project.kafka.connect.utils.JsonUtils | |
import io.confluent.connect.jdbc.dialect.DatabaseDialect | |
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider | |
import io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect | |
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField | |
import java.sql.PreparedStatement | |
import org.apache.kafka.common.config.AbstractConfig | |
import org.apache.kafka.connect.data.Schema | |
import org.postgresql.util.PGobject | |
class PostgreSqlWithJsonDatabaseDialect( | |
config: AbstractConfig | |
) : PostgreSqlDatabaseDialect(config) { | |
companion object { | |
private const val JSON_FIELD_TYPE = "jsonb" | |
private val JSON_SCHEMA_TYPES = setOf( | |
Schema.Type.ARRAY, | |
Schema.Type.MAP, | |
Schema.Type.STRUCT | |
) | |
} | |
override fun getSqlType(field: SinkRecordField): String { | |
return if (JSON_SCHEMA_TYPES.contains(field.schemaType())) | |
JSON_FIELD_TYPE else | |
super.getSqlType(field) | |
} | |
override fun maybeBindLogical( | |
statement: PreparedStatement, | |
index: Int, | |
schema: Schema, | |
value: Any? | |
): Boolean { | |
if (JSON_SCHEMA_TYPES.contains(schema.type())) { | |
val jsonObject = PGobject() | |
jsonObject.type = JSON_FIELD_TYPE | |
jsonObject.value = JsonUtils.convertToJson(schema, value).toString() | |
statement.setObject(index, jsonObject) | |
return true | |
} | |
return super.maybeBindLogical(statement, index, schema, value) | |
} | |
class Provider : DatabaseDialectProvider.SubprotocolBasedProvider( | |
PostgreSqlWithJsonDatabaseDialect::class.java.simpleName, | |
listOf("postgresql") | |
) { | |
override fun create(config: AbstractConfig): DatabaseDialect { | |
return PostgreSqlWithJsonDatabaseDialect(config) | |
} | |
/** | |
* Score would be greater than 0 if this url did in some way match "postgresql" | |
* We simply increment that number by 1 to score higher than the standard | |
* [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.Provider]. | |
*/ | |
override fun score(urlInfo: JdbcUrlInfo?): Int { | |
return super.score(urlInfo) | |
.let { if (it > 0) it + 1 else it } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment