Created
April 13, 2011 22:45
-
-
Save wardbekker/918589 to your computer and use it in GitHub Desktop.
A very basic word occurence counter using map/reduce
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(main). | |
-export([map/1]). | |
-export([main/0]). | |
-export([reducer_wrapper/1]). | |
-export([reducer/2]). | |
-define(NUMBER_OF_PARTITIONS, 100). | |
main() -> | |
Lines = readlines('pg20417.txt') , | |
lists:foreach( fun main:map/1, Lines ), | |
start_reduce(). | |
%% signals all reducer_wrappers that all emits are received, and the actual reduce can be started | |
start_reduce() -> | |
lists:foreach( fun(PartitionNr) -> get_reducer(PartitionNr) ! start_reduce end, lists:seq(1, ?NUMBER_OF_PARTITIONS) ). | |
%% emits every word with cnt = 1 | |
map(Line) -> | |
Words = string:tokens( string:to_lower(Line), " .,;:\n()?"), | |
lists:foreach ( fun(Word) -> get_reducer( Word, ?NUMBER_OF_PARTITIONS ) ! {emit, {Word, [1]}} end, Words ). | |
%% returns the reducer Pid | |
get_reducer( PartitionNr ) -> | |
ProcessName = list_to_atom( lists:concat( [PartitionNr, '_reducer'] ) ), | |
case whereis( ProcessName ) of | |
undefined -> | |
Pid = spawn(?MODULE, reducer_wrapper, [[]]), | |
register( ProcessName, Pid ), | |
Pid; | |
Pid -> | |
Pid | |
end. | |
get_reducer( Key, NumberOfPartitions ) -> | |
get_reducer( erlang:phash2(Key) rem NumberOfPartitions + 1 ). | |
%% the reducer wrapper proces | |
reducer_wrapper(KeyValueTuples) -> | |
receive | |
{ emit, KeyValueTuple } -> | |
reducer_wrapper( [KeyValueTuple] ++ KeyValueTuples ); | |
start_reduce -> | |
Dictionary = lists:foldl( fun main:reducer/2, dict:new(), KeyValueTuples ), | |
show_all( dict:to_list(Dictionary) ) | |
end. | |
reducer( { Key, [Cnt] }, Dictionary ) -> | |
case dict:is_key( Key, Dictionary ) of | |
true -> | |
dict:store( Key, dict:fetch( Key, Dictionary ) + Cnt, Dictionary ); | |
false -> | |
dict:store( Key, Cnt, Dictionary ) | |
end. | |
readlines(FileName) -> | |
{ok, Device} = file:open(FileName, [read]), | |
get_all_lines(Device, []). | |
get_all_lines(Device, Accum) -> | |
case io:get_line(Device, "") of | |
eof -> file:close(Device), Accum; | |
Line -> get_all_lines(Device, Accum ++ [Line]) | |
end. | |
show_all([Head|Tail]) -> | |
io:format("~p~n", [Head]), | |
show_all(Tail); | |
show_all([]) -> | |
void. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment