Skip to content

Instantly share code, notes, and snippets.

@toby5box
Last active January 17, 2025 03:33
Show Gist options
  • Save toby5box/f4901d1b50532358caf7580d0a962908 to your computer and use it in GitHub Desktop.
Save toby5box/f4901d1b50532358caf7580d0a962908 to your computer and use it in GitHub Desktop.
fs2 TCP server on multple ports
import fs2.io.net
import fs2.*
import cats.effect.*
import cats.implicits.*
import cats.*
import com.comcast.ip4s.*
import fs2.io.net.Network
import java.nio.charset.StandardCharsets
object MainMinimal extends IOApp {
final val CR: Byte = 0x0d.toByte
private def lines(in: Stream[IO,Byte]): Stream[IO, String] = {
def read(s: Stream[IO,Byte], acc: List[Byte]): Pull[IO,String,Unit] = {
s.pull.uncons1.flatMap(
_.fold(
Pull.done
) {
case (`CR`,rest) => // emit collected bytes as a String
Pull.output1(new String(acc.reverse.toArray, StandardCharsets.US_ASCII))
>> read(rest, Nil)
case (b,rest) =>
read(rest, b :: acc)
}
)
}
read(in, Nil).stream
}
private def listen: Stream[IO,String] = {
val host1 = Host.fromString("127.0.0.1")
Stream.exec(IO.println(s"Starting server")) ++
Stream.emits(List((host1,Port.fromInt(8193)), (host1,Port.fromInt(8194))))
.map {
case (host, port) =>
Stream.exec(IO.println(s"Listening on $host, $port")) ++
Network[IO].server(address = host, port = port)
.map { skt =>
Stream.exec(IO.println(s"+++ Connection on $host, $port")) ++
skt.reads
.through(lines)
.evalTap(s => IO.println(s"xxxxxx"))
.evalTap(s => IO.println(s"$host # $port : $s"))
.handleErrorWith(t => Stream.exec(IO.println(s"Read Error: $t"))) ++
Stream.exec(IO.println(s"--- Disconnection from $host, $port"))
}.parJoinUnbounded
}
.parJoinUnbounded
}
override def run(args: List[String]): IO[ExitCode] = listen.compile.drain.as(ExitCode.Success)
}
@toby5box
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment