CREATE SERVER server_name [ TYPE 'server_type' ] [ VERSION 'server_version' ]
FOREIGN DATA WRAPPER fdw_name
[ OPTIONS ( option 'value' [, ... ] ) ]
CREATE FOREIGN TABLE [ IF NOT EXISTS ] table_name ( [
{ column_name data_type [ OPTIONS ( option 'value' [, ... ] ) ] [ COLLATE collation ] [ column_constraint [ ... ] ]
| table_constraint }
[, ... ]
] )
[ INHERITS ( parent_table [, ... ] ) ]
SERVER server_name
[ OPTIONS ( option 'value' [, ... ] ) ]
[ ]
name |
host |
segid |
csv_file_path |
seg0 |
host0 |
1 |
/var/file/1/file.csv |
seg1 |
host0 |
2 |
/var/file/2/file.csv |
seg2 |
host2 |
3 |
/var/file/3/file.csv |
CREATE EXTENSION file_fdw;
CREATE SERVER fs FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE external_tsv
(id char(3), i integer, name text)
SERVER fs
OPTIONS (filename '/path/to/<segid>/file.csv', encoding="gb2312", option_mapper 'mapper_callback')
MPP_OPTIONS(option_mapper 'mapper_callback');
char*
default_mapper_callback(char* key, char* value, int segid)
{
if (strcmp(keys[i], "filename") == 0)
{
return strreplace("value", "<segid>", segid);
}
return NULL;
}
CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
CREATE FOREIGN TABLE kafka_test (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
) SERVER kafka_server OPTIONS
(format 'csv', topic 'contrib_regress', batch_size '30', buffer_delay '100');