Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save ThomasAlexandre/8463db11f6763638e1687182122acf40 to your computer and use it in GitHub Desktop.

Select an option

Save ThomasAlexandre/8463db11f6763638e1687182122acf40 to your computer and use it in GitHub Desktop.
Distributed wordcount in unison - similar to scio workshop example
-- Distributed Word Count Example
-- Original distributed wordcount from Scala Scio lib, see Beam Summit Workshop at https://www.youtube.com/watch?v=cGvaQp_h5ek&t=2s
-- Reads ./data/muestra.txt, counts words, outputs top 10 to ./data/wordcount-results.txt
wordcount.run : '{IO, Exception} ()
wordcount.run = do
use Text ++
inputPath = FilePath "./data/muestra.txt"
outputPath = FilePath "./data/wordcount-results.txt"
-- Read the input file
content = readFileUtf8 inputPath
-- Split into words and count
words = content |> Text.toLowercase |> splitOnWhitespace
counts = countWords words
-- Get top 10
top10 =
counts
|> Map.toList
|> List.sortBy (cases (_, count) -> Nat.negate count)
|> List.take 10
-- Format output
output =
top10
|> List.map (cases (word, count) -> word ++ ": " ++ Nat.toText count)
|> Text.join "\n"
-- Write results
writeFileUtf8 outputPath output
printLine ("Top 10 words written to " ++ FilePath.toText outputPath)
-- Split text on whitespace, filtering empty strings
splitOnWhitespace : Text -> [Text]
splitOnWhitespace txt =
txt
|> Text.split ?\n
|> List.flatMap (Text.split ?\t)
|> List.flatMap (Text.split ? )
|> List.filter (word -> Text.size word > 0)
-- Count occurrences of each word
countWords : [Text] -> Map Text Nat
countWords words =
go acc = cases
[] -> acc
word +: rest ->
current = Optional.getOrElse 0 (Map.get word acc)
acc' = Map.insert word (current + 1) acc
go acc' rest
go Map.empty words
-- Negate a Nat for descending sort (returns Int)
Nat.negate : Nat -> Int
Nat.negate n = Int.negate (Nat.toInt n)
-- Distributed word count logic (runs on Cloud)
wordcount.distributed.logic : Location {} -> Text ->{Remote, Exception} Text
wordcount.distributed.logic loc content =
use Text ++
-- Split content into chunks for parallel processing
chunks = content |> Text.toLowercase |> splitIntoChunks 4
-- Process chunks in parallel on remote nodes (pure computation, no abilities needed)
tasks =
chunks
|> List.map (chunk -> Remote.forkAt loc do
countWords (splitOnWhitespace chunk))
-- Await all results
partialCounts : [Map Text Nat]
partialCounts = List.map Remote.await tasks
-- Merge all partial counts
finalCounts = List.foldLeft mergeCounts Map.empty partialCounts
-- Get top 10
top10 =
finalCounts
|> Map.toList
|> List.sortBy (cases (_, count) -> Nat.negate count)
|> List.take 10
-- Format output
top10
|> List.map (cases (word, count) -> word ++ ": " ++ Nat.toText count)
|> Text.join "\n"
-- Deploy to cloud
wordcount.distributed.deploy : '{IO, Exception} Text
wordcount.distributed.deploy = Cloud.main do
use Text ++
inputPath = FilePath "./data/muestra.txt"
outputPath = FilePath "./data/wordcount-results.txt"
content = readFileUtf8 inputPath
-- Get environment and run distributed logic
env = Environment.default()
result = Cloud.submit env do
loc = here!
wordcount.distributed.logic loc content
writeFileUtf8 outputPath result
printLine ("Distributed word count complete. Results in " ++ FilePath.toText outputPath)
result
-- Split text into n roughly equal chunks
splitIntoChunks : Nat -> Text -> [Text]
splitIntoChunks n txt =
size = Text.size txt
chunkSize = size / n
go acc start =
if start >= size then acc
else
chunk = Text.drop start txt |> Text.take chunkSize
go (acc :+ chunk) (start + chunkSize)
go [] 0
-- Merge two word count maps
mergeCounts : Map Text Nat -> Map Text Nat -> Map Text Nat
mergeCounts m1 m2 =
merge acc kv =
match kv with
(k, v) ->
current = Optional.getOrElse 0 (Map.get k acc)
Map.insert k (current + v) acc
List.foldLeft merge m1 (Map.toList m2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment