I created this to help me run benchmarks/comparisons against Universal ID, but it could serve as the foundation for a robust ETL data pipeline... and it's less than 70 LOC right now! 🤯 🚀
It handles the extract and transform parts of an ETL process and supports the following options:
only
- specify which attributes to includeexcept
- specify which attributes to omitcopy
- make a copy of the record (omit pks, fks, and timestamps)nested_attributes
- include child associations in the same format that forms usereject_blank
- omit blank values to reduce payload size
Campaign.first.extract # => { ... } # Hash of extracted data
Campaign.first.transform # => "{ ... }" JSON string of extracted data
# customize things
Campaign.first.extract except: ["description"], reject_blank: true
Campaign.first.transform except: ["description"], reject_blank: true
# make a shallow copy that omits pk, fks, and timestamps
Campaign.first.extract copy: true
Campaign.first.transform copy: true
# make a deep copy and save an entirely new set of records with a single save operation
deep_copy = Campaign.first.extract copy: true, nested_attributes: true
deep_copy.save
require_relative "active_record_etl"
class ApplicationRecord < ActiveRecord::Base
include Testable
self.abstract_class = true
def data_pipeline
ActiveRecordETL.new self
end
delegate :extract, :transform, to: :data_pipeline
end
# ...
class Campaign < ApplicationRecord
has_many :emails, dependent: :destroy
accepts_nested_attributes_for :emails
end
# ...
class Email < ApplicationRecord
belongs_to :campaign
has_many :attachments, dependent: :destroy
accepts_nested_attributes_for :attachments
end
# ...
class Attachment < ApplicationRecord
belongs_to :email
end
# active_record_etl.rb
class ActiveRecordETL
attr_reader :record
# Initializes a new ActiveRecord ETL Data Pipeline
# @param record [ActiveRecord] the record to ETL
def initialize(record)
@record = record
end
# The record's attributes
# @return [Hash] the record's attributes
def attributes
record.attributes
end
# The record's primary key name
# @return [String]
def primary_key
record.class.primary_key
end
# Attribute names that the record `accepts_nested_attributes_for`
# @return [Array<String>]
def nested_attribute_names
record.class.nested_attributes_options.keys.map(&:to_s)
end
# Attribute names for all the record's `belongs_to` associations
# @return [Array<String>]
def parent_attribute_names
record.class.reflections.each_with_object([]) do |(name, reflection), memo|
memo << reflection.foreign_key if reflection.macro == :belongs_to
end
end
# Attribute names for the record's timestamps
# @return [Array<String>]
def timestamp_attribute_names
record.class.all_timestamp_attributes_in_model.dup
end
# Extracts data from the record
#
# @param :except [Array<String>] List of attributes to omit (optional, trumps :only, defaults to [])
# @param :only [Array<String>] List of attributes to extract (optional, defaults to [])
# @param :copy [Boolean] Whether or not to omit keys and timestamps (optional, defaults to false)
# @param :nested_attributes [Boolean] Indicates if nested attributes should be included (optional, defaults to false)
# @param :reject_blank [Boolean] Indicates if blank values should be omitted (optional, defaults to false)
# @return [Hash] The extracted data
def extract(**options)
options = normalize_options(**options)
hash = attributes.each_with_object({}) do |(name, value), memo|
memo[name] = value unless skip?(name, value, **options)
end
if options[:nested_attributes]
nested_attribute_names.each do |name|
key = "#{name}_attributes"
values = record.send(name)
hash[key] = values.map { |val| extract_next(val, **options) } unless skip?(name, values, **options)
end
end
hash
end
# Transforms the record into the specified data format
#
# @param :format [Symbol] The data format to transform the record into (optional, defaults to :json)
# @param :except [Array<String>] List of attributes to omit (optional, trumps :only, defaults to [])
# @param :only [Array<String>] List of attributes to extract (optional, defaults to [])
# @param :copy [Boolean] Whether or not to omit keys and timestamps (optional, defaults to false)
# @param :nested_attributes [Boolean] Indicates if nested attributes should be included (optional, defaults to false)
# @param :reject_blank [Boolean] Indicates if blank values should be omitted (optional, defaults to false)
# @return [String] the transformed data
# @raise [NotImplementedError] if the specified format is not supported.
def transform(format: :json, **options)
case format
when :json then extract(**options).to_json
else raise NotImplementedError
end
end
private
def extract_next(record, **options)
self.class.new(record).extract(**options)
end
def normalize_only_values(**options)
(options[:only] || []).map(&:to_s)
end
def normalize_except_values(**options)
(options[:except] || []).map(&:to_s).tap do |except|
if options[:copy]
except << primary_key
except.concat parent_attribute_names, timestamp_attribute_names
end
end
end
def normalize_options(**options)
options[:only] = normalize_only_values(**options)
options[:except] = normalize_except_values(**options)
options
end
def skip?(name, value, **options)
return true if options[:except].any? && options[:except].include?(name)
return true if options[:only].any? && options[:only].exclude?(name)
return true if value.blank? && options[:reject_blank]
false
end
end