Forked from mtraynham/PostgreSqlWithJsonDatabaseDialect.kt
Created
March 12, 2022 00:44
-
-
Save alex88/6f231f7c94a48c0632ab0d09aaa2d723 to your computer and use it in GitHub Desktop.
Kafka Connect Postgres JSONB Dialect (Kafka 2.2.0)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.project.kafka.connect.jdbc.dialect | |
import com.project.kafka.connect.utils.JsonUtils | |
import io.confluent.connect.jdbc.dialect.DatabaseDialect | |
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider | |
import io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect | |
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField | |
import java.sql.PreparedStatement | |
import org.apache.kafka.common.config.AbstractConfig | |
import org.apache.kafka.connect.data.Schema | |
import org.postgresql.util.PGobject | |
class PostgreSqlWithJsonDatabaseDialect( | |
config: AbstractConfig | |
) : PostgreSqlDatabaseDialect(config) { | |
companion object { | |
private const val JSON_FIELD_TYPE = "jsonb" | |
private val JSON_SCHEMA_TYPES = setOf( | |
Schema.Type.ARRAY, | |
Schema.Type.MAP, | |
Schema.Type.STRUCT | |
) | |
} | |
override fun getSqlType(field: SinkRecordField): String { | |
return if (JSON_SCHEMA_TYPES.contains(field.schemaType())) | |
JSON_FIELD_TYPE else | |
super.getSqlType(field) | |
} | |
override fun maybeBindLogical( | |
statement: PreparedStatement, | |
index: Int, | |
schema: Schema, | |
value: Any? | |
): Boolean { | |
if (JSON_SCHEMA_TYPES.contains(schema.type())) { | |
val jsonObject = PGobject() | |
jsonObject.type = JSON_FIELD_TYPE | |
jsonObject.value = JsonUtils.convertToJson(schema, value).toString() | |
statement.setObject(index, jsonObject) | |
return true | |
} | |
return super.maybeBindLogical(statement, index, schema, value) | |
} | |
class Provider : DatabaseDialectProvider.SubprotocolBasedProvider( | |
PostgreSqlWithJsonDatabaseDialect::class.java.simpleName, | |
listOf("postgresql") | |
) { | |
override fun create(config: AbstractConfig): DatabaseDialect { | |
return PostgreSqlWithJsonDatabaseDialect(config) | |
} | |
/** | |
* Score would be greater than 0 if this url did in some way match "postgresql" | |
* We simply increment that number by 1 to score higher than the standard | |
* [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.Provider]. | |
*/ | |
override fun score(urlInfo: JdbcUrlInfo?): Int { | |
return super.score(urlInfo) | |
.let { if (it > 0) it + 1 else it } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment