Created
February 25, 2021 00:14
-
-
Save markehammons/e5da16bd160a7fbabdac79b87e067238 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zio._ | |
import zio.stream._ | |
import scala.scalanative.unsafe._ | |
import scala.scalanative.posix.unistd.{ | |
fork, | |
vfork, | |
pipe, | |
close, | |
dup2, | |
STDIN_FILENO, | |
STDERR_FILENO, | |
STDOUT_FILENO, | |
execve, | |
_exit, | |
write, | |
read | |
} | |
import scala.scalanative.posix.sys.types.pid_t | |
import scala.scalanative.unsigned._ | |
import ops.lib.csys.Wait | |
import java.io.FileInputStream | |
object Process { | |
val default = Runtime.default | |
trait ZProcess { | |
val terminate: Task[Unit] | |
val kill: Task[Unit] | |
val exitValue: Task[Int] | |
val outStream: Stream[Throwable, Byte] | |
val isAlive: Task[Boolean] | |
} | |
private def runSubprocess( | |
bin: LPath, | |
args: Seq[String], | |
inPipe: CInt, | |
outPipe: CInt | |
): Nothing = { | |
val totalArgs = bin.toString() +: args | |
Zone { implicit z => | |
val cArgs = alloc[CString](totalArgs.length + 1) | |
for { i <- 0 until totalArgs.length } { | |
cArgs(i) = toCString(totalArgs(i)) | |
} | |
cArgs(totalArgs.length + 1) = null | |
println("done loading") | |
dup2(outPipe, STDIN_FILENO) | |
dup2(inPipe, STDOUT_FILENO) | |
dup2(inPipe, STDERR_FILENO) | |
close(inPipe) | |
close(outPipe) | |
val res = execve(cArgs(0), cArgs, null) | |
println(s"closed with $res") | |
_exit(res) | |
} | |
??? | |
} | |
private def feedSubprocess( | |
inputStream: Stream[Throwable, Byte], | |
outPipe: CInt | |
): Nothing = { | |
println("writing to baby") | |
default.unsafeRunAsync { | |
inputStream | |
.grouped(1024) | |
.foreachWhile { c => | |
Task( | |
Zone { implicit z => | |
val heap = alloc[Byte](c.size) | |
for (i <- 0 until c.size) { | |
heap(i) = c(i) | |
} | |
println(s"writing ${c.size} bytes") | |
val wrote = write(outPipe, heap, c.size.toULong) | |
println(s"wrote $wrote bytes") | |
wrote == c.size | |
} | |
) | |
} | |
.ensuring(URIO { | |
println("closing pipes") | |
_exit(0) | |
}) | |
} { case _ => | |
() | |
} | |
??? | |
} | |
private def readFromSubprocess( | |
inPipe: CInt, | |
fork2: pid_t | |
): Stream[Throwable, Byte] = { | |
Stream | |
.unfoldChunk(1024) { lastSize => | |
println("trying to read") | |
val buf = stackalloc[Byte](1024) | |
println(inPipe) | |
println(lastSize) | |
if (lastSize < 1) None | |
else { | |
val rd = read(inPipe, buf, 1024.toULong) | |
val cb = ChunkBuilder.make[Byte](rd) | |
println(s"read $rd bytes") | |
var i = 0 | |
while (i < rd) { | |
cb += buf(i) | |
i += 1 | |
} | |
Some(cb.result() -> rd) | |
} | |
} | |
.ensuring(UIO { | |
() | |
}) | |
.concat { | |
Stream | |
.fromEffect(Task { | |
val result = stackalloc[CInt](1) | |
Wait.waitpid(fork2, result, 0) | |
!result | |
}) | |
.flatMap(result => | |
if (result == 0) Stream.empty | |
else Stream.fail(new Exception(s"process failed: ${result}")) | |
) | |
} | |
} | |
trait Service { | |
def getBinaryPath(path: Iterable[LPath], bin: LPath): Option[LPath] | |
def runProcess( | |
bin: LPath, | |
args: String* | |
)( | |
inputStream: Stream[Throwable, Byte] = Stream.empty | |
): Task[Stream[Throwable, Byte]] | |
} | |
case class ProcessImpl(env: zio.system.System.Service) extends Service { | |
def getBinaryPath(path: Iterable[LPath], bin: LPath): Option[LPath] = { | |
if (bin.isAbsolute || bin.startsWith(".")) { | |
Some(bin) | |
} else { | |
path.map(_ / bin).find(_.isExecutable) | |
} | |
} | |
def runProcess( | |
bin: LPath, | |
args: String* | |
)( | |
inputStream: Stream[Throwable, Byte] = Stream.empty | |
): Task[Stream[Throwable, Byte]] = | |
for { | |
path <- env | |
.env("PATH") | |
.someOrFail(new Exception("PATH is undefined")) | |
bin <- Task(getBinaryPath(path.split(":").map(LPath(_)), bin)) | |
.someOrFail(new Exception(s"Could not find executable $bin")) | |
} yield { | |
val inPipes = stackalloc[CInt](2) | |
val outPipes = stackalloc[CInt](2) | |
pipe(inPipes) | |
pipe(outPipes) | |
val fork1 = fork() | |
if (fork1 != 0) { | |
val fork2 = vfork() | |
if (fork2 == 0) { | |
close(inPipes(0)) | |
close(outPipes(1)) | |
runSubprocess(bin, args, inPipes(1), outPipes(0)) | |
} else { | |
close(outPipes(0)) | |
close(outPipes(1)) | |
close(inPipes(1)) | |
readFromSubprocess(inPipes(0), fork2) | |
} | |
} else { | |
close(inPipes(0)) | |
close(outPipes(1)) | |
close(outPipes(0)) | |
feedSubprocess(inputStream, inPipes(1)) | |
} | |
} | |
} | |
val live: ZLayer[Has[zio.system.System.Service], Throwable, Has[ | |
Process.Service | |
]] = | |
ZLayer.fromService(ProcessImpl) | |
def runProcess(bin: LPath, args: String*)( | |
inputStream: Stream[Throwable, Byte] = Stream.empty | |
) = { | |
for { | |
ps <- RIO.environment[Has[Process.Service]] | |
res <- ps.get[Process.Service].runProcess(bin, args: _*)(inputStream) | |
} yield res | |
} | |
} | |
//usage | |
val t = for { | |
catStream <- Process | |
.runProcess(l"cat")( | |
Stream.fromChunk(Chunk.fromArray("hello\n".getBytes())) | |
) | |
outStream <- Process | |
.runProcess(l"cat")(catStream) | |
_ <- outStream.foreachChunk(c => putStr(new String(c.toArray))) | |
} yield {} | |
runtime.unsafeRunAsync(t.provideSomeLayer[ZEnv](Process.live)) { | |
case zio.Exit.Failure(cause) => println(cause.prettyPrint) | |
case zio.Exit.Success(_) => println("complete") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment