Skip to content

Instantly share code, notes, and snippets.

@hiroyuki-sato
Created February 11, 2015 03:10
Show Gist options
  • Save hiroyuki-sato/ea4e245d819e52417f2c to your computer and use it in GitHub Desktop.
Save hiroyuki-sato/ea4e245d819e52417f2c to your computer and use it in GitHub Desktop.
Embulk matcing filter plugin
exec: {}
in:
type: file
paths: [/home/arch/hoge/csv]
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
header_line: true
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
filters:
- type: matching
matching_file: /path/to/matching_file.yml
mcolumns:
- name: hoge
type: string
src: id
key: "hoge"
default: "N/A"
- name: hoge2
type: string
src: account
key: "fuga"
default: "N/A"
out: {type: stdout}
require 'yaml'
require 'pp'
module Embulk
module Plugin
class FilterMatching < FilterPlugin
# filter plugin file name must be: embulk/filter_<name>.rb
Plugin.register_filter('matching', self)
def self.transaction(config, in_schema, &control)
task = {}
matching_file = config.param('matching_file', :string)
matching_data = YAML.load_file(matching_file)
raise "matching_file must be Hash" unless matching_data.kind_of?(Hash)
task['matching_data'] = matching_data
add_columns = []
matching_info = []
in_schema_size = in_schema.size
#
# make matching columns.
# make matcing info hash.
#
# mcolumns:
# - name: pref_name # matching column name
# type: string # matching column type
# key: pref_data # source matching column
# src: pref_code # source matching column
# default: "N/A" # default value
#
# matching_info:
# - src_pos: 1 # source_column positon.
# key: matcing_data_key
#
mcolumns = config.param('mcolumns',:array)
mcolumns.each_with_index do |mcolumn,i|
name = mcolumn['name']
type = mcolumn['type'] || :string
key = mcolumn['key']
src = mcolumn['src']
default = mcolumn['default']
mcol_pos = in_schema_size + i
src_pos = in_schema.index{ |x| x.name == src }
# TODO Error check.
add_columns << Column.new(mcol_pos,name,type.to_sym)
matching_info << { 'src_pos' => src_pos,
'key' => key,
'default' => default }
end
task['matching_info'] = matching_info
out_columns = in_schema + add_columns
puts "Matching filter started."
yield(task, out_columns)
puts "Matching filter finished."
end
def initialize(task, in_schema, out_schema, page_builder)
super
end
def close
end
def add(page)
matching_info = @task['matching_info']
matching_data = @task['matching_data']
page.each do |record|
add_columns = []
matching_info.each do |info|
src_pos = info['src_pos']
key = info['key']
default = info['default']
src = record[src_pos]
map_val = matching_data[key][src.to_s]
#pp default
#pp map_val
if( map_val.nil? && !default.nil? )
map_val = default
end
puts "convert record[#{src_pos}] #{src} result #{map_val}"
add_columns << map_val
end
@page_builder.add(record + add_columns)
end
end
def finish
@page_builder.finish
end
end
end
end
hoge:
1: hoge1
2: hoge2
3: hoge3
fuga:
32864: fuga1
14824: fuga2
27559: fuga3
11270: fuga4
java -jar embulk.jar -b my_bundle run example2.yml
2015-02-10 22:08:32,954 [INFO]: main:org.embulk.standards.LocalFileInputPlugin: Listing local files with prefix '/home/embulk/hoge/csv'
Matching filter started.
2015-02-10 22:08:33,197 [INFO]: main:org.embulk.exec.LocalExecutor: {done: 0 / 1, running: 0}
convert record[0] 1 result hoge1
convert record[1] 32864 result fuga1
convert record[0] 2 result hoge2
convert record[1] 14824 result fuga2
convert record[0] 3 result hoge3
convert record[1] 27559 result fuga3
convert record[0] 4 result N/A
convert record[1] 11270 result fuga4
1,32864,2015-01-27 19:23:49.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk,hoge1,fuga1
2,14824,2015-01-27 19:01:23.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk jruby,hoge2,fuga2
3,27559,2015-01-28 02:20:02.000000 +0000,2015-01-28 00:00:00.000000 +0000,embulk core,hoge3,fuga3
4,11270,2015-01-29 11:54:36.000000 +0000,2015-01-29 00:00:00.000000 +0000,Embulk "csv" parser plugin,N/A,fuga4
2015-02-10 22:08:33,437 [INFO]: main:org.embulk.exec.LocalExecutor: {done: 1 / 1, running: 0}
Matching filter finished.
2015-02-10 22:08:33,438 [INFO]: main:org.embulk.command.Runner: next config: {"in":{},"out":{}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment