Skip to content

Instantly share code, notes, and snippets.

@wardbekker
Created April 13, 2011 22:45
Show Gist options
  • Save wardbekker/918589 to your computer and use it in GitHub Desktop.
Save wardbekker/918589 to your computer and use it in GitHub Desktop.
A very basic word occurence counter using map/reduce
-module(main).
-export([map/1]).
-export([main/0]).
-export([reducer_wrapper/1]).
-export([reducer/2]).
-define(NUMBER_OF_PARTITIONS, 100).
main() ->
Lines = readlines('pg20417.txt') ,
lists:foreach( fun main:map/1, Lines ),
start_reduce().
%% signals all reducer_wrappers that all emits are received, and the actual reduce can be started
start_reduce() ->
lists:foreach( fun(PartitionNr) -> get_reducer(PartitionNr) ! start_reduce end, lists:seq(1, ?NUMBER_OF_PARTITIONS) ).
%% emits every word with cnt = 1
map(Line) ->
Words = string:tokens( string:to_lower(Line), " .,;:\n()?"),
lists:foreach ( fun(Word) -> get_reducer( Word, ?NUMBER_OF_PARTITIONS ) ! {emit, {Word, [1]}} end, Words ).
%% returns the reducer Pid
get_reducer( PartitionNr ) ->
ProcessName = list_to_atom( lists:concat( [PartitionNr, '_reducer'] ) ),
case whereis( ProcessName ) of
undefined ->
Pid = spawn(?MODULE, reducer_wrapper, [[]]),
register( ProcessName, Pid ),
Pid;
Pid ->
Pid
end.
get_reducer( Key, NumberOfPartitions ) ->
get_reducer( erlang:phash2(Key) rem NumberOfPartitions + 1 ).
%% the reducer wrapper proces
reducer_wrapper(KeyValueTuples) ->
receive
{ emit, KeyValueTuple } ->
reducer_wrapper( [KeyValueTuple] ++ KeyValueTuples );
start_reduce ->
Dictionary = lists:foldl( fun main:reducer/2, dict:new(), KeyValueTuples ),
show_all( dict:to_list(Dictionary) )
end.
reducer( { Key, [Cnt] }, Dictionary ) ->
case dict:is_key( Key, Dictionary ) of
true ->
dict:store( Key, dict:fetch( Key, Dictionary ) + Cnt, Dictionary );
false ->
dict:store( Key, Cnt, Dictionary )
end.
readlines(FileName) ->
{ok, Device} = file:open(FileName, [read]),
get_all_lines(Device, []).
get_all_lines(Device, Accum) ->
case io:get_line(Device, "") of
eof -> file:close(Device), Accum;
Line -> get_all_lines(Device, Accum ++ [Line])
end.
show_all([Head|Tail]) ->
io:format("~p~n", [Head]),
show_all(Tail);
show_all([]) ->
void.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment