Created
January 16, 2026 09:07
-
-
Save ThomasAlexandre/8463db11f6763638e1687182122acf40 to your computer and use it in GitHub Desktop.
Distributed wordcount in unison - similar to scio workshop example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- Distributed Word Count Example | |
| -- Original distributed wordcount from Scala Scio lib, see Beam Summit Workshop at https://www.youtube.com/watch?v=cGvaQp_h5ek&t=2s | |
| -- Reads ./data/muestra.txt, counts words, outputs top 10 to ./data/wordcount-results.txt | |
| wordcount.run : '{IO, Exception} () | |
| wordcount.run = do | |
| use Text ++ | |
| inputPath = FilePath "./data/muestra.txt" | |
| outputPath = FilePath "./data/wordcount-results.txt" | |
| -- Read the input file | |
| content = readFileUtf8 inputPath | |
| -- Split into words and count | |
| words = content |> Text.toLowercase |> splitOnWhitespace | |
| counts = countWords words | |
| -- Get top 10 | |
| top10 = | |
| counts | |
| |> Map.toList | |
| |> List.sortBy (cases (_, count) -> Nat.negate count) | |
| |> List.take 10 | |
| -- Format output | |
| output = | |
| top10 | |
| |> List.map (cases (word, count) -> word ++ ": " ++ Nat.toText count) | |
| |> Text.join "\n" | |
| -- Write results | |
| writeFileUtf8 outputPath output | |
| printLine ("Top 10 words written to " ++ FilePath.toText outputPath) | |
| -- Split text on whitespace, filtering empty strings | |
| splitOnWhitespace : Text -> [Text] | |
| splitOnWhitespace txt = | |
| txt | |
| |> Text.split ?\n | |
| |> List.flatMap (Text.split ?\t) | |
| |> List.flatMap (Text.split ? ) | |
| |> List.filter (word -> Text.size word > 0) | |
| -- Count occurrences of each word | |
| countWords : [Text] -> Map Text Nat | |
| countWords words = | |
| go acc = cases | |
| [] -> acc | |
| word +: rest -> | |
| current = Optional.getOrElse 0 (Map.get word acc) | |
| acc' = Map.insert word (current + 1) acc | |
| go acc' rest | |
| go Map.empty words | |
| -- Negate a Nat for descending sort (returns Int) | |
| Nat.negate : Nat -> Int | |
| Nat.negate n = Int.negate (Nat.toInt n) | |
| -- Distributed word count logic (runs on Cloud) | |
| wordcount.distributed.logic : Location {} -> Text ->{Remote, Exception} Text | |
| wordcount.distributed.logic loc content = | |
| use Text ++ | |
| -- Split content into chunks for parallel processing | |
| chunks = content |> Text.toLowercase |> splitIntoChunks 4 | |
| -- Process chunks in parallel on remote nodes (pure computation, no abilities needed) | |
| tasks = | |
| chunks | |
| |> List.map (chunk -> Remote.forkAt loc do | |
| countWords (splitOnWhitespace chunk)) | |
| -- Await all results | |
| partialCounts : [Map Text Nat] | |
| partialCounts = List.map Remote.await tasks | |
| -- Merge all partial counts | |
| finalCounts = List.foldLeft mergeCounts Map.empty partialCounts | |
| -- Get top 10 | |
| top10 = | |
| finalCounts | |
| |> Map.toList | |
| |> List.sortBy (cases (_, count) -> Nat.negate count) | |
| |> List.take 10 | |
| -- Format output | |
| top10 | |
| |> List.map (cases (word, count) -> word ++ ": " ++ Nat.toText count) | |
| |> Text.join "\n" | |
| -- Deploy to cloud | |
| wordcount.distributed.deploy : '{IO, Exception} Text | |
| wordcount.distributed.deploy = Cloud.main do | |
| use Text ++ | |
| inputPath = FilePath "./data/muestra.txt" | |
| outputPath = FilePath "./data/wordcount-results.txt" | |
| content = readFileUtf8 inputPath | |
| -- Get environment and run distributed logic | |
| env = Environment.default() | |
| result = Cloud.submit env do | |
| loc = here! | |
| wordcount.distributed.logic loc content | |
| writeFileUtf8 outputPath result | |
| printLine ("Distributed word count complete. Results in " ++ FilePath.toText outputPath) | |
| result | |
| -- Split text into n roughly equal chunks | |
| splitIntoChunks : Nat -> Text -> [Text] | |
| splitIntoChunks n txt = | |
| size = Text.size txt | |
| chunkSize = size / n | |
| go acc start = | |
| if start >= size then acc | |
| else | |
| chunk = Text.drop start txt |> Text.take chunkSize | |
| go (acc :+ chunk) (start + chunkSize) | |
| go [] 0 | |
| -- Merge two word count maps | |
| mergeCounts : Map Text Nat -> Map Text Nat -> Map Text Nat | |
| mergeCounts m1 m2 = | |
| merge acc kv = | |
| match kv with | |
| (k, v) -> | |
| current = Optional.getOrElse 0 (Map.get k acc) | |
| Map.insert k (current + v) acc | |
| List.foldLeft merge m1 (Map.toList m2) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment