Created
January 12, 2018 20:16
-
-
Save yaauie/f87c9ee2841bccb4e6969c70bb90631f to your computer and use it in GitHub Desktop.
logstash-filters-cache-common
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Logstash::Filters::CacheCommon < LogStash::Filters::Base | |
| config :get, validate: :hash, required: false | |
| config :set, validate: :hash, required: false | |
| def register | |
| @get ||= {} | |
| @set ||= {} | |
| if (@get.empty? && @set.empty?) | |
| logger.error("plugin has no directives") | |
| fail("plugin has no directives; add `set` or `get` directive") | |
| end | |
| logger.info("cache configured", log_context) | |
| end | |
| def filter(event) | |
| do_set(event) | |
| do_get(event) | |
| filter_matched(event) | |
| end | |
| ## | |
| # @api private | |
| def do_get(event) | |
| return unless @get && !@get.empty? | |
| event_fields_by_memcached_key = @get.each_with_object({}) do |(memcached_key_template, event_field), memo| | |
| memcached_key = event.sprintf(memcached_key_template) | |
| memo[memcached_key] = event_field | |
| end | |
| memcached_keys = event_fields_by_memcached_key.keys | |
| cache_hits_by_memcached_key = cache_mget(memcached_keys) | |
| event_fields_by_memcached_key.each do |memcached_key, event_field| | |
| value = cache_hits_by_memcached_key[memcached_key] | |
| if value.nil? | |
| logger.trace("cache:get miss", log_context(key: memcached_key)) | |
| else | |
| logger.trace("cache:get hit", log_context(key: memcached_key, value: value)) | |
| event.set(event_field, value) | |
| end | |
| end | |
| end | |
| ## | |
| # @api private | |
| def do_set(event) | |
| return unless @set && !@set.empty? | |
| values_by_memcached_key = @set.each_with_object({}) do |(event_field, memcached_key_template), memo| | |
| memcached_key = event.sprintf(memcached_key_template) | |
| value = event.get(event_field) | |
| memo[memcached_key] = value unless value.nil? | |
| end | |
| cache_mset(values_by_memcached_key) | |
| end | |
| ## | |
| # Override with multi-get operation if your cache supports getting many | |
| # values with a single operation | |
| # @api public | |
| # @param keys [Array{String}] | |
| # @return [Hash{String => Object}] | |
| def cache_mget(keys) | |
| keys.each_with_object({}) do |key, memo| | |
| memo[key] = cache_get(key) | |
| end | |
| end | |
| ## | |
| # Override with multi-set operation if your cache supports setting many | |
| # values with a single operation | |
| # @api public | |
| # @param key_value_map [Hash{String => Object}] | |
| # @return [void] | |
| def cache_mset(key_value_map) | |
| key_value_map.each do |key, value| | |
| cache_set(key, value) | |
| end | |
| end | |
| ## | |
| # Override with single-get operation; if you implement `cache_mget`, | |
| # you do not need to also implement this. | |
| # @param key [String] | |
| # @return [Object, nil] | |
| def cache_get(key) | |
| fail NotImplementedError | |
| end | |
| ## | |
| # Override with single-set operation; if you implement `cache_mset`, | |
| # you do not need to also implement this. | |
| # @param key [String] | |
| # @param value [Object] | |
| # @return [void] | |
| def cache_set(key, value) | |
| fail NotImplementedError | |
| end | |
| ## | |
| # @api private (see `Logstash::Filters::CacheCommon#base_log_context`) | |
| # @param additional_context [Hash{#to_s=>#to_json}] | |
| # @return [Hash{#to_s=>#to_json}] | |
| def log_context(additional_context={}) | |
| return base_log_context if additional_context.empty? | |
| base_log_context.merge(additional_context) | |
| end | |
| ## | |
| # Override with "base" context for log events, which includes details about | |
| # connection configuration, relevant plugin settings, etc. | |
| # @return [Hash{#to_s=>#to_json}] | |
| def base_log_context | |
| @base_log_context ||= {}.freeze | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Logstash::Filters::Memcached < Logstash::Filters::CacheCommon | |
| include Logstash::PluginMixins::Memcached # provides configurable dalli_client | |
| config_name 'memcached' | |
| def cache_mget(keys) | |
| dalli_client.get_multi(keys) | |
| end | |
| def cache_set(key, value) | |
| dalli_client.set(key, value) | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Common Memcached connection config, can be included into Input, Output, or Filter classes. | |
| module Logstash::PluginMixins::Memcached | |
| def self.included(base) | |
| if !base.kind_of?(Class) || !base.ancestors.include?(LogStash::Config::Mixin) | |
| fail("`#{base}` is not a configurable logstash plugin") | |
| end | |
| base.instance_exec do | |
| config :hosts, validate: :array, default: %w(localhost:11211) | |
| config :namespace, validate: :string, required: false | |
| config :ttl, validate: :number, default: 0 | |
| end | |
| end | |
| ## | |
| # If your concrete implementation defines `register`, don't forget | |
| # to send `super`. | |
| def register | |
| super if defined?(super) | |
| if @ttl < 0 | |
| logger.error("ttl cannot be negative") | |
| fail("invalid ttl: cannot be negative") | |
| end | |
| @dalli_client= establish_dalli_connection | |
| end | |
| protected | |
| ## | |
| # @api protected | |
| # @return [Dalli::Client] | |
| def dalli_client | |
| @dalli_client | |
| end | |
| private | |
| def establish_dalli_connection | |
| require 'dalli' | |
| hosts = valid_connection_hosts | |
| options = valid_connection_options | |
| logger.info('connecting to memcached', log_context(hosts: hosts, options: options)) | |
| Dalli::Client.new(@hosts, options).tap do |client| | |
| begin | |
| client.alive! | |
| rescue Dalli::RingError | |
| logger.error("failed to connect", log_context(hosts: hosts, options: options)) | |
| fail("cannot connect to memcached") | |
| end | |
| end | |
| end | |
| def valid_connection_options | |
| {}.tap do |options| | |
| options[:ttl] = @ttl | |
| options[:namespace] = @namespace unless @namespace.nil? || @namespace.empty? | |
| end | |
| end | |
| def valid_connection_hosts | |
| logger.error("configuration: hosts empty!") && fail if @hosts.empty? | |
| @hosts.map(&:to_s) | |
| end | |
| def base_log_context | |
| super().merge( | |
| {}.tap do |context| | |
| context[:hosts] = @hosts | |
| context[:ttl] = @ttl | |
| context[:namespace] = @namespace unless @namespace.nil? || @namespace.empty? | |
| end).freeze | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment