This is a series of gists documenting testing done with the numeric.mapping option in Kafka Connect.
- Oracle
- MS SQL Server
- Postgres
- MySQL - n/a because of #563
—@rmoff January 9, 2019
This is a series of gists documenting testing done with the numeric.mapping option in Kafka Connect.
—@rmoff January 9, 2019
| col1 | col2 | col3 | col4 | |
|---|---|---|---|---|
| MSSQL column definition | DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
| MSSQL created column | decimallength 5 precision 5 scale 2 |
numericlength 5 precision 5 scale 2 |
decimallength 5 precision 5 scale 0 |
decimallength 9 precision 18 scale 0 |
| Source data in MSSQL | 100.01 |
-100.02 |
100 |
100 |
numeric.mapping = none (same as leaving it unset) |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
numeric.mapping = best_fit |
Bytes '\u0011 |
Double -100.02 |
Bytes d |
Bytes d |
numeric.mapping = best_fit( query used to CAST all DECIMAL fields to NUMERIC) |
Double 100.01 |
Double -100.02 |
Int 100 |
Int 100 |
numeric.mapping = precision_only |
Bytes '\u0011 |
Bytes Øî |
Int 100 |
Int 100 |
Since MS SQL accepts both DECIMAL and NUMERIC as data types, use NUMERIC for Kafka Connect to correctly ingest the values when using numeric.precision=best_fit. If changing the source schema isn't an option then you can use query mode, demonstrated below.
CREATE TABLE demo.NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL
);
INSERT INTO demo..NUM_TEST VALUES (42,42,100.01, -100.02, 100, 100);
numeric.mapping is left unset.
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-01-",
"table.whitelist" : "demo.dbo.NUM_TEST",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all NUMERIC fields are bytes
ksql> PRINT 'mssql-01-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 1:06:17 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-01-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-02-",
"table.whitelist" : "demo.dbo.NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"poll.interval.ms" : 3600000
}
}'
Output data - NUMERIC field is DOUBLE (FLOAT), but all DECIMAL fields remain as bytes
ksql> PRINT 'mssql-02-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 2:07:06 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": -100.02, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-02-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-03-",
"table.whitelist" : "demo.dbo.NUM_TEST",
"mode":"bulk",
"numeric.mapping": "precision_only",
"poll.interval.ms" : 3600000
}
}'
Output data - all fields are still bytes. This is because the only zero-scale field is a DECIMAL not NUMERIC. If there was a NUMERIC field with zero scale then we would expect to see it handled correctly here as cast to INT.
ksql> PRINT 'mssql-03-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 2:09:16 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-03-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mssql_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
"connection.user": "connect_user",
"connection.password": "Asgard123",
"topic.prefix": "mssql-04-NUM_TEST",
"query" : "SELECT TXN_ID ,CUSTOMER_ID ,CAST(AMOUNT_01 AS NUMERIC(5,2)) AS AMOUNT_01 ,AMOUNT_02 ,CAST(AMOUNT_03 AS NUMERIC(5)) AS AMOUNT_03 ,CAST(AMOUNT_04 AS NUMERIC) AS AMOUNT_04 FROM NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"poll.interval.ms" : 3600000
}
}'
Output data - NUMERIC fields are DOUBLE (FLOAT) and INT as appropriate
ksql> PRINT 'mssql-04-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/8/19 2:18:30 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": 100}
Schema
$ curl -s "http://localhost:8081/subjects/mssql-04-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
"long"
],
"default": null
}
| int_col_A | int_col_A | col1 | col2 | col3 | col4 | col5 | col6 | col7 | |
|---|---|---|---|---|---|---|---|---|---|
| Oracle DDL | INT |
INT |
DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
NUMBER(5,2) |
NUMBER(5) |
NUMBER |
| Oracle created column | NUMBER(38) |
NUMBER(38) |
NUMBER(5,2) |
NUMBER(5,2) |
NUMBER(5) |
NUMBER(38) |
NUMBER(5,2) |
NUMBER(5) |
NUMBER |
Source data INSERTed to Oracle |
42 |
42 |
100.01 |
100.02 |
100.03 |
100.04 |
100.05 |
100.06 |
100.07 |
Data SELECTed from Oracle |
42 |
42 |
100.01 |
-100.02 |
100 |
100 |
100.05 |
100 |
100.07 |
numeric.mapping = none (same as leaving it unset) |
Bytes * |
Bytes * |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
Bytes '\u0015 |
Bytes d |
Bytes \u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000 |
numeric.mapping = best_fit |
Bytes * |
Bytes * |
Double100.01 |
Double-100.02 |
Int100 |
Bytes d |
Double100.05 |
Int100 |
Bytes \u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000 |
numeric.mapping = precision |
Bytes * |
Bytes * |
Bytes '\u0011 |
Bytes Øî |
100 |
Bytes d |
Bytes '\u0015 |
Int100 |
Bytes \u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000 |
numeric.mapping = best_fit( query used to CAST all fields to NUMERIC with precision and scale) |
Int42 |
Int42 |
Double100.01 |
Double-100.02 |
Int100 |
Double100.0 |
Double100.05 |
Double100.0 |
Double100.07 |
numeric.mapping to work, the NUMBER should have a declared precision. If it doesn't then Oracle creates it with a precision of 38 (NUMBER(38)) which Connect cannot store in a type other than the bytes/BigDecimal.NUMBER, but create them as NUMBER(9,2) (or however big it needs to be to store the values).query option to cast the source data to appropriate data types.DECIMAL, NUMERIC, and INT as NUMBER fields.CREATE TABLE NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL,
AMOUNT_05 NUMBER(5,2),
AMOUNT_06 NUMBER(5),
AMOUNT_07 NUMBER
);
INSERT INTO NUM_TEST VALUES (42,42,100.01, -100.02, 100.03, 100.04, 100.05, 100.06, 100.07);
numeric.mapping is left unset.
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-01-",
"table.whitelist" : "NUM_TEST",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all fields are written as bytes (including those specified as INT)
ksql> PRINT 'oracle-01-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:04:34 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": {"bytes": "'\u0015"}, "AMOUNT_06": {"bytes": "d"}, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-02-",
"table.whitelist" : "NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"poll.interval.ms" : 3600000
}
}'
Output data
ksql> PRINT 'oracle-02-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:04:52 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": 100.05, "AMOUNT_06": 100, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-03-",
"table.whitelist" : "NUM_TEST",
"mode":"bulk",
"numeric.mapping": "precision_only",
"poll.interval.ms" : 3600000
}
}'
Output data
ksql> PRINT 'oracle-03-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:05:01 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": 100, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": {"bytes": "'\u0015"}, "AMOUNT_06": 100, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "oracle-04-NUM_TEST",
"mode":"bulk",
"numeric.mapping": "best_fit",
"query" : "SELECT CAST(TXN_ID AS NUMBER(5,0)) AS TXN_ID,CAST(CUSTOMER_ID AS NUMBER(5,0)) AS CUSTOMER_ID, AMOUNT_01 ,AMOUNT_02 ,AMOUNT_03 ,CAST(AMOUNT_04 AS NUMBER(5,2)) AS AMOUNT_04 ,AMOUNT_05, CAST(AMOUNT_06 AS NUMBER(5,2)) AS AMOUNT_06, CAST(AMOUNT_07 AS NUMBER(5,2)) AS AMOUNT_07 FROM NUM_TEST",
"poll.interval.ms" : 3600000
}
}'
Output data - all fields are DOUBLE (FLOAT) and INT as appropriate
ksql> PRINT 'oracle-04-NUM_TEST' FROM BEGINNING;
Format:AVRO
1/9/19 12:12:19 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": 100.0, "AMOUNT_05": 100.05, "AMOUNT_06": 100.0, "AMOUNT_07": 100.07}
$ curl -s "http://localhost:8081/subjects/oracle-01-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
$ curl -s "http://localhost:8081/subjects/oracle-02-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
$ curl -s "http://localhost:8081/subjects/oracle-03-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
$ curl -s "http://localhost:8081/subjects/oracle-04-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
"name": "AMOUNT_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "AMOUNT_04",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_05",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_06",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "AMOUNT_07",
"type": [
"null",
"double"
],
"default": null
}
| col1 | col2 | col3 | col4 | |
|---|---|---|---|---|
| Postgres column definition | DECIMAL(5,2) |
NUMERIC(5,2) |
DECIMAL(5) |
DECIMAL |
| Source data in Postgres | 100.01 |
-100.02 |
100 |
100 |
numeric.mapping = none (same as leaving it unset) |
Bytes '\u0011 |
Bytes Øî |
Bytes d |
Bytes d |
numeric.mapping = best_fit |
Double 100.01 |
Double -100.02 |
Int 100 |
Int 100 |
numeric.mapping = precision_only |
Bytes '\u0011 |
Bytes Øî |
Int 100 |
Int 100 |
CREATE TABLE demo.NUM_TEST (
TXN_ID INT,
CUSTOMER_ID INT,
AMOUNT_01 DECIMAL(5,2),
AMOUNT_02 NUMERIC(5,2),
AMOUNT_03 DECIMAL(5),
AMOUNT_04 DECIMAL
);
-- The precision represents the number of significant digits that are stored for values
-- the scale represents the number of digits that can be stored following the decimal point.
INSERT INTO demo.NUM_TEST VALUES (42,42,100.01, -100.02, 100, 100);
-- postgres=# SELECT * FROM demo.NUM_TEST;
-- txn_id | customer_id | amount_01 | amount_02 | amount_03 | amount_04
-- --------+-------------+-----------+-----------+-----------+-----------
-- 42 | 42 | 100.01 | -100.02 | 100 | 100.04
-- 42 | 42 | 200.01 | -200.02 | 200 | 200
-- (1 row)
-- postgres=# \d demo.NUM_TEST;
-- Table "demo.num_test"
-- Column | Type | Collation | Nullable | Default
-- -------------+--------------+-----------+----------+---------
-- txn_id | integer | | |
-- customer_id | integer | | |
-- amount_01 | numeric(5,2) | | |
-- amount_02 | numeric(5,2) | | |
-- amount_03 | numeric(5,0) | | |
-- amount_04 | numeric | | |
numeric.mapping is left unset.
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_15",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-15-",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all NUMERIC fields are bytes
ksql> print 'postgres-15-num_test' from beginning;
Format:AVRO
1/7/19 4:25:13 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": {"bytes": "d"}, "amount_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-15-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_12",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-12-",
"numeric.mapping": "best_fit",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - NUMERIC fields are DOUBLE (FLOAT) and INT as appropriate
ksql> print 'postgres-12-num_test' from beginning;
Format:AVRO
1/7/19 4:27:08 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": 100.01, "amount_02": -100.02, "amount_03": 100, "amount_04": 100}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-12-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
"double"
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "int",
"connect.type": "int8"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_13",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-13-",
"numeric.mapping": "none",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - all NUMERIC fields are bytes
ksql> print 'postgres-13-num_test' from beginning;
Format:AVRO
1/7/19 4:27:52 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": {"bytes": "d"}, "amount_04": {"bytes": "d"}}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-13-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_14",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-14-",
"numeric.mapping": "precision_only",
"table.whitelist" : "demo.num_test",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Output data - zero scale NUMERIC fields are INT, others are still bytes
ksql> print 'postgres-14-num_test' from beginning;
Format:AVRO
1/7/19 4:05:57 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": 100, "amount_04": 100}
Schema
$ curl -s "http://localhost:8081/subjects/postgres-14-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
{
"name": "amount_01",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_02",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
{
"name": "amount_03",
"type": [
"null",
"int"
],
"default": null
}
{
"name": "amount_04",
"type": [
"null",
{
"type": "int",
"connect.type": "int8"
}
],
"default": null
}
If you define a field as having zero scale (e.g. NUMERIC(5)) but insert a value with scale into it, the INSERT succeeds but will be ignored when querying:
[2019-01-07 16:04:44,456] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.source.BulkTableQuerier)
org.postgresql.util.PSQLException: Bad value for type BigDecimal : 100.04
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.scaleBigDecimal(AbstractJdbc2ResultSet.java:3131)
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getBigDecimal(AbstractJdbc2ResultSet.java:2469)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$columnConverterFor$20(GenericDatabaseDialect.java:1177)
at io.confluent.connect.jdbc.source.SchemaMapping$FieldSetter.setField(SchemaMapping.java:158)
at io.confluent.connect.jdbc.source.BulkTableQuerier.extractRecord(BulkTableQuerier.java:76)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:309)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thank you for your work. I want to add that for Oracle, it is important to know that the threshold for numeric.mapping=best_fit to recognize an Integer for e.g. Oracle column type NUMBER(p,s), it has to be p<19 to fit in primitive datatype (see also the code).
Thanks for this - it really helped me figure out how
numeric.mappingworks.