Skip to content

Instantly share code, notes, and snippets.

@samspills
Created May 4, 2023 13:33
Show Gist options
  • Save samspills/029d0ec01cc829e61318b2b3c933c39f to your computer and use it in GitHub Desktop.
Save samspills/029d0ec01cc829e61318b2b3c933c39f to your computer and use it in GitHub Desktop.
//> using lib "co.fs2::fs2-core:3.5.0"
//> using lib "org.typelevel::cats-effect:3.4.6"
import fs2.Stream
object StreamingChunks {
def mapCheck = Stream
.emits(0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.map { i =>
println(s"inside map, element before sleep: $i")
Thread.sleep(1000)
i + 0.1
}
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def mapChunksCheck = Stream
.emits(0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.mapChunks { c =>
println(s"inside map, chunk before sleep: $c")
Thread.sleep(1000)
c.map(_ + 0.1)
}
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def pipeCheck = Stream
.emits(0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.through(st =>
st.map { i =>
println(s"inside map inside pipe, chunk before sleep: $i")
Thread.sleep(1000)
i + 0.1
}
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def main(args: Array[String]): Unit = mapChunksCheck
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment