Created
July 16, 2020 06:14
-
-
Save ohoroyoi/6a86ccb77f6796238c14ba8fabde429f to your computer and use it in GitHub Desktop.
logstash : input 3 pg table, output 4 es indexes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input { | |
jdbc { | |
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8" | |
jdbc_user => "atlas" | |
jdbc_passwor => "atlaspw" | |
jdbc_validate_connection => true | |
jdbc_driver_library => "DRIVER_PATH" | |
jdbc_driver_class => "org.postgresql.Driver" | |
schedule => "* * * * *" | |
statement => "SELECT region_id, region_type, country_code, country_code3, continent_code, source_from, | |
st_asgeojson(center_geo_point)::text as center_geo_point_text, center_longitude, center_latitude, jsonn::text | |
from expedia_region_union where source_timestamp + interval '9 hours' > :sql_last_value" | |
tracking_column => "source_timestamp" | |
tracking_column_type => "timestamp" | |
use_column_value => true | |
clean_run => false | |
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run" | |
tags => ["expedia_region_union"] | |
} | |
jdbc { | |
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8" | |
jdbc_user => "atlas" | |
jdbc_passwor => "atlaspw" | |
jdbc_validate_connection => true | |
jdbc_driver_library => "DRIVER_PATH" | |
jdbc_driver_class => "org.postgresql.Driver" | |
schedule => "*/10 * * * *" | |
statement => "select id, iata_airport_code, name, name_full, country_code, center_latitude, center_longitude, name_kr as name_korean, name_full_kr as name_korean_full, | |
st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from, | |
source_time, iata_airport_metro_code, (select json_build_object('region_id', region_id, 'region_name', region_name, | |
'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c" | |
jdbc_paging_enabled => true | |
jdbc_page_size => "100000" | |
tags => ["expedia_airport_more"] | |
} | |
jdbc { | |
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8" | |
jdbc_user => "atlas" | |
jdbc_passwor => "atlaspw" | |
jdbc_validate_connection => true | |
jdbc_driver_library => "DRIVER_PATH" | |
jdbc_driver_class => "org.postgresql.Driver" | |
schedule => "*/10 * * * *" | |
statement => "select source_from, region_name_kr as region_name_korean, region_name_full_kr as region_name_korean_full, | |
region_id, region_name_Full, continent_code, region_type, descendants::text as descendants_text from expedia_region_continent"} | |
tags => ["expedia_region_continent"] | |
} | |
stdin { codec => plain { charset => "UTF-8" }} | |
filter { | |
if "expedia_region_union" in [tags] { | |
ruby { | |
code => " | |
require 'json' | |
begin | |
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}') | |
event.set('point', center_geo_point_json) | |
event.remove('center_geo_point_text') | |
rescue Exception => e | |
# event.tag('malfunctioned center_geo_point') | |
end | |
begin | |
location = { lat: event.get('center_latitude').to_f, lon: event.get('center_longitude').to_f} | |
event.set('location', JSON(location.to_json)) | |
event.set('center_longitude', event.get('center_longitude')) | |
event.set('center_latitude', event.get('center_latitude')) | |
rescue Exception => e | |
# event.tag('something happens in location block ') | |
end | |
begin | |
jsonn_json = JSON.parse(event.get('jsonn').to_s || '{}') | |
jsonn_json.each {|k,v| | |
event.set(k,v) | |
} | |
event.remove('jsonn') | |
rescue Exception => e | |
# event.tag('something happens in jsonn') | |
end | |
" | |
} | |
date { | |
match => ['time', 'UNIX'] | |
} | |
clone { | |
clones => ['test-union-007', ] | |
} | |
} | |
} | |
output {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment