Created
February 11, 2015 03:10
-
-
Save hiroyuki-sato/ea4e245d819e52417f2c to your computer and use it in GitHub Desktop.
Embulk matcing filter plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exec: {} | |
in: | |
type: file | |
paths: [/home/arch/hoge/csv] | |
decoders: | |
- {type: gzip} | |
parser: | |
charset: UTF-8 | |
newline: CRLF | |
type: csv | |
delimiter: ',' | |
quote: '"' | |
header_line: true | |
columns: | |
- {name: id, type: long} | |
- {name: account, type: long} | |
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} | |
- {name: purchase, type: timestamp, format: '%Y%m%d'} | |
- {name: comment, type: string} | |
filters: | |
- type: matching | |
matching_file: /path/to/matching_file.yml | |
mcolumns: | |
- name: hoge | |
type: string | |
src: id | |
key: "hoge" | |
default: "N/A" | |
- name: hoge2 | |
type: string | |
src: account | |
key: "fuga" | |
default: "N/A" | |
out: {type: stdout} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'yaml' | |
require 'pp' | |
module Embulk | |
module Plugin | |
class FilterMatching < FilterPlugin | |
# filter plugin file name must be: embulk/filter_<name>.rb | |
Plugin.register_filter('matching', self) | |
def self.transaction(config, in_schema, &control) | |
task = {} | |
matching_file = config.param('matching_file', :string) | |
matching_data = YAML.load_file(matching_file) | |
raise "matching_file must be Hash" unless matching_data.kind_of?(Hash) | |
task['matching_data'] = matching_data | |
add_columns = [] | |
matching_info = [] | |
in_schema_size = in_schema.size | |
# | |
# make matching columns. | |
# make matcing info hash. | |
# | |
# mcolumns: | |
# - name: pref_name # matching column name | |
# type: string # matching column type | |
# key: pref_data # source matching column | |
# src: pref_code # source matching column | |
# default: "N/A" # default value | |
# | |
# matching_info: | |
# - src_pos: 1 # source_column positon. | |
# key: matcing_data_key | |
# | |
mcolumns = config.param('mcolumns',:array) | |
mcolumns.each_with_index do |mcolumn,i| | |
name = mcolumn['name'] | |
type = mcolumn['type'] || :string | |
key = mcolumn['key'] | |
src = mcolumn['src'] | |
default = mcolumn['default'] | |
mcol_pos = in_schema_size + i | |
src_pos = in_schema.index{ |x| x.name == src } | |
# TODO Error check. | |
add_columns << Column.new(mcol_pos,name,type.to_sym) | |
matching_info << { 'src_pos' => src_pos, | |
'key' => key, | |
'default' => default } | |
end | |
task['matching_info'] = matching_info | |
out_columns = in_schema + add_columns | |
puts "Matching filter started." | |
yield(task, out_columns) | |
puts "Matching filter finished." | |
end | |
def initialize(task, in_schema, out_schema, page_builder) | |
super | |
end | |
def close | |
end | |
def add(page) | |
matching_info = @task['matching_info'] | |
matching_data = @task['matching_data'] | |
page.each do |record| | |
add_columns = [] | |
matching_info.each do |info| | |
src_pos = info['src_pos'] | |
key = info['key'] | |
default = info['default'] | |
src = record[src_pos] | |
map_val = matching_data[key][src.to_s] | |
#pp default | |
#pp map_val | |
if( map_val.nil? && !default.nil? ) | |
map_val = default | |
end | |
puts "convert record[#{src_pos}] #{src} result #{map_val}" | |
add_columns << map_val | |
end | |
@page_builder.add(record + add_columns) | |
end | |
end | |
def finish | |
@page_builder.finish | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hoge: | |
1: hoge1 | |
2: hoge2 | |
3: hoge3 | |
fuga: | |
32864: fuga1 | |
14824: fuga2 | |
27559: fuga3 | |
11270: fuga4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java -jar embulk.jar -b my_bundle run example2.yml | |
2015-02-10 22:08:32,954 [INFO]: main:org.embulk.standards.LocalFileInputPlugin: Listing local files with prefix '/home/embulk/hoge/csv' | |
Matching filter started. | |
2015-02-10 22:08:33,197 [INFO]: main:org.embulk.exec.LocalExecutor: {done: 0 / 1, running: 0} | |
convert record[0] 1 result hoge1 | |
convert record[1] 32864 result fuga1 | |
convert record[0] 2 result hoge2 | |
convert record[1] 14824 result fuga2 | |
convert record[0] 3 result hoge3 | |
convert record[1] 27559 result fuga3 | |
convert record[0] 4 result N/A | |
convert record[1] 11270 result fuga4 | |
1,32864,2015-01-27 19:23:49.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk,hoge1,fuga1 | |
2,14824,2015-01-27 19:01:23.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk jruby,hoge2,fuga2 | |
3,27559,2015-01-28 02:20:02.000000 +0000,2015-01-28 00:00:00.000000 +0000,embulk core,hoge3,fuga3 | |
4,11270,2015-01-29 11:54:36.000000 +0000,2015-01-29 00:00:00.000000 +0000,Embulk "csv" parser plugin,N/A,fuga4 | |
2015-02-10 22:08:33,437 [INFO]: main:org.embulk.exec.LocalExecutor: {done: 1 / 1, running: 0} | |
Matching filter finished. | |
2015-02-10 22:08:33,438 [INFO]: main:org.embulk.command.Runner: next config: {"in":{},"out":{}} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment