** embulk-0.4.0 ** 以降だと動かないと思います。
そのうちどなたかがjavaで実装しそうなので..
out:
type: fluent_logger_ruby
host: 192.168.xx.xx # default localhost
port: 24224 # default 24224
tag: debug.test| require 'fluent-logger' | |
| module Embulk | |
| module Plugin | |
| class FluentLoggerRuby < OutputPlugin | |
| # output plugin file name must be: embulk/output_<name>.rb | |
| Plugin.register_output('fluent_logger_ruby', self) | |
| def self.transaction(config, schema, count, &control) | |
| task = { | |
| 'host' => config.param('host', :string, default: "localhost"), | |
| 'port' => config.param('port', :integer, default: 24224), | |
| 'tag' => config.param('tag', :string) | |
| } | |
| resume(task, schema, count, &control) | |
| end | |
| def self.resume(task, schema, count, &control) | |
| puts "FluentLoggerRuby output started." | |
| commit_reports = yield(task) | |
| puts "FluentLoggerRuby output finished. Commit reports = #{commit_reports.to_json}" | |
| next_config_diff = {} | |
| return next_config_diff | |
| end | |
| def initialize(task, schema, index) | |
| puts "FluentLoggerRuby output thread #{index}..." | |
| super | |
| @tag = task['tag'] | |
| @logger = Fluent::Logger::FluentLogger.new(nil, :host => task['host'], :port => task['port']) | |
| @records = 0 | |
| end | |
| def close | |
| end | |
| def add(page) | |
| page.each do |record| | |
| hash = Hash[schema.names.zip(record)] | |
| unless @logger.post(@tag, hash) | |
| # TODO error check | |
| end | |
| @records += 1 | |
| end | |
| end | |
| def finish | |
| end | |
| def abort | |
| end | |
| def commit | |
| commit_report = { | |
| "records" => @records | |
| } | |
| return commit_report | |
| end | |
| end | |
| end | |
| end |