|
defmodule FromPipe do |
|
# |
|
# Use a helper script "from_pipe_release" to |
|
# release/request each line read from the |
|
# named pipe - effectively implementing a |
|
# crude backpressure mechanism |
|
# |
|
@pipe_name "/tmp/testpipe" |
|
@from_pipe_release "./from_pipe_release" |
|
@from_pipe_clean "./from_pipe_clean" |
|
|
|
# * terminate potential zombie OS process |
|
# * trash potential left over named pipe |
|
# |
|
def clean() do |
|
script_path = Path.expand(@from_pipe_release) |
|
path = Path.expand(@from_pipe_clean) |
|
System.cmd(path, [script_path, @pipe_name]) |
|
end |
|
|
|
# Signal releasing OS process to clean up and terminate |
|
# |
|
def sig_int(os_pid) do |
|
System.cmd("kill", ["-INT", Integer.to_string(os_pid)]) |
|
:ok |
|
end |
|
|
|
# Start the releasing OS process script. |
|
# Monitor in case process terminates |
|
# |
|
def start do |
|
path = Path.expand(@from_pipe_release) |
|
args = [@pipe_name] |
|
port = Port.open({:spawn_executable, path}, [:binary, args: args]) |
|
ref = Port.monitor(port) |
|
{port, ref} |
|
end |
|
|
|
# Demonitor and Close and close port |
|
# Signal OS process to terminate |
|
# |
|
def stop({port, ref}) do |
|
Port.demonitor(ref, [:flush]) |
|
info = Port.info(port, :os_pid) |
|
Port.close(port) |
|
|
|
# interrupt script reading from named pipe |
|
case info do |
|
{:os_pid, os_pid} -> |
|
sig_int(os_pid) |
|
|
|
_ -> |
|
:ok |
|
end |
|
end |
|
|
|
# Data Processing Loop |
|
# |
|
def loop({port, ref} = args) do |
|
# i.e. ready for next line |
|
Port.command(port, "ready\n") |
|
|
|
receive do |
|
{^port, {:data, "quit\n"}} -> |
|
stop(args) |
|
|
|
{^port, {:data, data}} -> |
|
IO.puts("port data: #{data}") |
|
loop(args) |
|
|
|
{:DOWN, ^ref, :port, ^port, _reason} -> |
|
# script was terminated |
|
:ok |
|
|
|
msg -> |
|
IO.puts("other: #{inspect(msg)}") |
|
loop(args) |
|
end |
|
end |
|
|
|
# Start "listening"" |
|
# |
|
def run() do |
|
clean() |
|
loop(start()) |
|
end |
|
end |