This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
- Oracle
- MS SQL Server
- Postgres
- MySQL - n/a because of #563
—@rmoff January 9, 2019
This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
—@rmoff January 9, 2019
col1 | col2 | col3 | col4 | |
---|---|---|---|---|
MSSQL column definition | DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
MSSQL created column | decimal length 5 precision 5 scale 2 |
numeric length 5 precision 5 scale 2 |
decimal length 5 precision 5 scale 0 |
decimal length 9 precision 18 scale 0 |
Source data in MSSQL | 100.01 |
-100.02 |
100 |
100 |
numeric.mapping = none (same as leaving it unset) |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
numeric.mapping = best_fit |
Bytes '\u0011 |
Double -100.02 |
Bytes d |
Bytes d |
numeric.mapping = best_fit ( query used to CAST all DECIMAL fields to NUMERIC) |
Double 100.01 |
Double -100.02 |
Int 100 |
Int 100 |
numeric.mapping = precision_only |
Bytes '\u0011 |
Bytes Øî |
Int 100 |
Int 100 |
Since MS SQL accepts both DECIMAL
and NUMERIC
as data types, use NUMERIC
for Kafka Connect to correctly ingest the values when using numeric.precision=best_fit
. If changing the source schema isn't an option then you can use query
mode, demonstrated below.
CREATE TABLE demo.NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL
);
INSERT INTO demo..NUM_TEST VALUES (42,42,100.01, -100.02, 100, 100);
numeric.mapping
is left unset.
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-01-",
"table.whitelist" : "demo.dbo.NUM_TEST",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all NUMERIC fields are bytes
ksql> PRINT 'mssql-01-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 1:06:17 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-01-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-02-",
"table.whitelist" : "demo.dbo.NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"poll.interval.ms" : 3600000
}
}'
Output data - NUMERIC field is DOUBLE (FLOAT), but all DECIMAL fields remain as bytes
ksql> PRINT 'mssql-02-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 2:07:06 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": -100.02, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-02-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-03-",
"table.whitelist" : "demo.dbo.NUM_TEST",
"mode":"bulk",
"numeric.mapping": "precision_only",
"poll.interval.ms" : 3600000
}
}'
Output data - all fields are still bytes. This is because the only zero-scale field is a DECIMAL not NUMERIC. If there was a NUMERIC field with zero scale then we would expect to see it handled correctly here as cast to INT.
ksql> PRINT 'mssql-03-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 2:09:16 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-03-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-04-NUM_TEST",
"query" : "SELECT TXN_ID ,CUSTOMER_ID ,CAST(AMOUNT_01 AS NUMERIC(5,2)) AS AMOUNT_01 ,AMOUNT_02 ,CAST(AMOUNT_03 AS NUMERIC(5)) AS AMOUNT_03 ,CAST(AMOUNT_04 AS NUMERIC) AS AMOUNT_04 FROM NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"poll.interval.ms" : 3600000
}
}'
Output data - NUMERIC fields are DOUBLE (FLOAT) and INT as appropriate
ksql> PRINT 'mssql-04-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 2:18:30 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": 100}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-04-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
"long"
],
"default": null
}
int_col_A | int_col_A | col1 | col2 | col3 | col4 | col5 | col6 | col7 | |
---|---|---|---|---|---|---|---|---|---|
Oracle DDL | INT |
INT |
DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
NUMBER(5,2) |
NUMBER(5) |
NUMBER |
Oracle created column | NUMBER(38) |
NUMBER(38) |
NUMBER(5,2) |
NUMBER(5,2) |
NUMBER(5) |
NUMBER(38) |
NUMBER(5,2) |
NUMBER(5) |
NUMBER |
Source data INSERT ed to Oracle |
42 |
42 |
100.01 |
100.02 |
100.03 |
100.04 |
100.05 |
100.06 |
100.07 |
Data SELECT ed from Oracle |
42 |
42 |
100.01 |
-100.02 |
100 |
100 |
100.05 |
100 |
100.07 |
numeric.mapping = none (same as leaving it unset) |
Bytes * |
Bytes * |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
Bytes '\u0015 |
Bytes d |
Bytes \u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000 |
numeric.mapping = best_fit |
Bytes * |
Bytes * |
Double100.01 |
Double-100.02 |
Int100 |
Bytes d |
Double100.05 |
Int100 |
Bytes \u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000 |
numeric.mapping = precision |
Bytes * |
Bytes * |
Bytes '\u0011 |
Bytes Øî |
100 |
Bytes d |
Bytes '\u0015 |
Int100 |
Bytes \u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000 |
numeric.mapping = best_fit ( query used to CAST all fields to NUMERIC with precision and scale) |
Int42 |
Int42 |
Double100.01 |
Double-100.02 |
Int100 |
Double100.0 |
Double100.05 |
Double100.0 |
Double100.07 |
numeric.mapping
to work, the NUMBER
should have a declared precision. If it doesn't then Oracle creates it with a precision of 38 (NUMBER(38)
) which Connect cannot store in a type other than the bytes/BigDecimal.NUMBER
, but create them as NUMBER(9,2)
(or however big it needs to be to store the values).query
option to cast the source data to appropriate data types.DECIMAL
, NUMERIC
, and INT
as NUMBER
fields.CREATE TABLE NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL,
AMOUNT_05 NUMBER(5,2),
AMOUNT_06 NUMBER(5),
AMOUNT_07 NUMBER
);
INSERT INTO NUM_TEST VALUES (42,42,100.01, -100.02, 100.03, 100.04, 100.05, 100.06, 100.07);
numeric.mapping
is left unset.
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-01-",
"table.whitelist" : "NUM_TEST",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all fields are written as bytes (including those specified as INT
)
ksql> PRINT 'oracle-01-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:04:34 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": {"bytes": "'\u0015"}, "AMOUNT_06": {"bytes": "d"}, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-02-",
"table.whitelist" : "NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"poll.interval.ms" : 3600000
}
}'
Output data
ksql> PRINT 'oracle-02-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:04:52 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": 100.05, "AMOUNT_06": 100, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-03-",
"table.whitelist" : "NUM_TEST",
"mode":"bulk",
"numeric.mapping": "precision_only",
"poll.interval.ms" : 3600000
}
}'
Output data
ksql> PRINT 'oracle-03-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:05:01 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": 100, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": {"bytes": "'\u0015"}, "AMOUNT_06": 100, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-04-NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"query" : "SELECT CAST(TXN_ID AS NUMBER(5,0)) AS TXN_ID,CAST(CUSTOMER_ID AS NUMBER(5,0)) AS CUSTOMER_ID, AMOUNT_01 ,AMOUNT_02 ,AMOUNT_03 ,CAST(AMOUNT_04 AS NUMBER(5,2)) AS AMOUNT_04 ,AMOUNT_05, CAST(AMOUNT_06 AS NUMBER(5,2)) AS AMOUNT_06, CAST(AMOUNT_07 AS NUMBER(5,2)) AS AMOUNT_07 FROM NUM_TEST",
"poll.interval.ms" : 3600000
}
}'
Output data - all fields are DOUBLE (FLOAT) and INT as appropriate
ksql> PRINT 'oracle-04-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:12:19 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": 100.0, "AMOUNT_05": 100.05, "AMOUNT_06": 100.0, "AMOUNT_07": 100.07}
$ curl -s "http://localhost:8081/subjects/oracle-01-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
$ curl -s "http://localhost:8081/subjects/oracle-02-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
$ curl -s "http://localhost:8081/subjects/oracle-03-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
$ curl -s "http://localhost:8081/subjects/oracle-04-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
"double"
],
"default": null
}
col1 | col2 | col3 | col4 | |
---|---|---|---|---|
Postgres column definition | DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
Source data in Postgres | 100.01 |
-100.02 |
100 |
100 |
numeric.mapping = none (same as leaving it unset) |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
numeric.mapping = best_fit |
Double 100.01 |
Double -100.02 |
Int 100 |
Int 100 |
numeric.mapping = precision_only |
Bytes '\u0011 |
Bytes Øî |
Int 100 |
Int 100 |
CREATE TABLE demo.NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL
);
-- The precision represents the number of significant digits that are stored for values
-- the scale represents the number of digits that can be stored following the decimal point.
INSERT INTO demo.NUM_TEST VALUES (42,42,100.01, -100.02, 100, 100);
-- postgres=# SELECT * FROM demo.NUM_TEST;
-- txn_id | customer_id | amount_01 | amount_02 | amount_03 | amount_04
-- --------+-------------+-----------+-----------+-----------+-----------
-- 42 | 42 | 100.01 | -100.02 | 100 | 100.04
-- 42 | 42 | 200.01 | -200.02 | 200 | 200
-- (1 row)
-- postgres=# \d demo.NUM_TEST;
-- Table "demo.num_test"
-- Column | Type | Collation | Nullable | Default
-- -------------+--------------+-----------+----------+---------
-- txn_id | integer | | |
-- customer_id | integer | | |
-- amount_01 | numeric(5,2) | | |
-- amount_02 | numeric(5,2) | | |
-- amount_03 | numeric(5,0) | | |
-- amount_04 | numeric | | |
numeric.mapping
is left unset.
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_15",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-15-",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all NUMERIC fields are bytes
ksql> print 'postgres-15-num_test' from beginning;
Format:AVRO
1/7/19 4:25:13 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": {"bytes": "d"}, "amount_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-15-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_12",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-12-",
"numeric.mapping": "best_fit",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - NUMERIC fields are DOUBLE (FLOAT) and INT as appropriate
ksql> print 'postgres-12-num_test' from beginning;
Format:AVRO
1/7/19 4:27:08 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": 100.01, "amount_02": -100.02, "amount_03": 100, "amount_04": 100}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-12-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "int",
"connect.type": "int8"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_13",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-13-",
"numeric.mapping": "none",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all NUMERIC fields are bytes
ksql> print 'postgres-13-num_test' from beginning;
Format:AVRO
1/7/19 4:27:52 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": {"bytes": "d"}, "amount_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-13-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_14",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-14-",
"numeric.mapping": "precision_only",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - zero scale NUMERIC fields are INT, others are still bytes
ksql> print 'postgres-14-num_test' from beginning;
Format:AVRO
1/7/19 4:05:57 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": 100, "amount_04": 100}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-14-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "int",
"connect.type": "int8"
}
],
"default": null
}
If you define a field as having zero scale (e.g. NUMERIC(5)) but insert a value with scale into it, the INSERT succeeds but will be ignored when querying:
[2019-01-07 16:04:44,456] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.source.BulkTableQuerier)
org.postgresql.util.PSQLException: Bad value for type BigDecimal : 100.04
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.scaleBigDecimal(AbstractJdbc2ResultSet.java:3131)
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getBigDecimal(AbstractJdbc2ResultSet.java:2469)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$columnConverterFor$20(GenericDatabaseDialect.java:1177)
at io.confluent.connect.jdbc.source.SchemaMapping$FieldSetter.setField(SchemaMapping.java:158)
at io.confluent.connect.jdbc.source.BulkTableQuerier.extractRecord(BulkTableQuerier.java:76)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:309)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thank you for your work. I want to add that for Oracle, it is important to know that the threshold for
numeric.mapping=best_fit
to recognize an Integer for e.g. Oracle column type NUMBER(p,s), it has to be p<19 to fit in primitive datatype (see also the code).