Skip to content

Instantly share code, notes, and snippets.

@shawn42
Created September 4, 2012 11:17
Show Gist options
  • Save shawn42/3620326 to your computer and use it in GitHub Desktop.
Save shawn42/3620326 to your computer and use it in GitHub Desktop.
Piece Piece map, aggregate, group-by
# Using group-by
# no extra step definition is required
PiecePipe::Pipeline.new.
source([{region: region}]).
step(FetchPowerPlantsByRegion).
group_by(:region).
to_enum
# each output would be a region => [plant_1_radiation, plant_2_radiation, ...]
# Using Map-Aggregate
PiecePipe::Pipeline.new.
source([{region: region}]).
step(FetchPowerPlantsByRegion).
step(KeyByRegion).
step(HashAggregator).
to_enum
# Expects inputs to be a Hash with :region, :date and :calories.
# Output will look like: { :some_region => original_item_including_region }
class KeyByRegion < PiecePipe::MapStep
def map(inputs)
key = inputs[:region]
val = inputs
emit key, val
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment