Skip to content

Instantly share code, notes, and snippets.

@rlcamp
Last active October 18, 2023 04:57
Show Gist options
  • Save rlcamp/c1eeb46e72e2967fa4ff65f0cd64a4bd to your computer and use it in GitHub Desktop.
Save rlcamp/c1eeb46e72e2967fa4ff65f0cd64a4bd to your computer and use it in GitHub Desktop.
demo of a three thread generator-style pipeline where the middle thread is a child process, with which communication happens via its stdin and stdout
/* demo of a three thread generator-style pipeline where the middle thread is a child process, with
which communication happens via its stdin and stdout */
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <pthread.h>
pid_t popen_bidirectional(int * fd_child_to_parent_p, int * fd_parent_to_child_p,
const char * const path, char * const argv[]) {
int fds_child_to_parent[2], fds_parent_to_child[2];
if (-1 == pipe(fds_child_to_parent) || -1 == pipe(fds_parent_to_child)) abort();
const pid_t pid = fork();
if (!pid) {
if (-1 == dup2(fds_child_to_parent[1], STDOUT_FILENO) ||
-1 == dup2(fds_parent_to_child[0], STDIN_FILENO)) abort();
if (-1 == close(fds_child_to_parent[0]) || -1 == close(fds_parent_to_child[1]) ||
-1 == close(fds_child_to_parent[1]) || -1 == close(fds_parent_to_child[0])) abort();
if (-1 == execvp(path, argv ? argv : (char *[]){ (char *)path, NULL })) {
fprintf(stderr, "error: %s: execv(): %s\n", __func__, strerror(errno));
exit(EXIT_FAILURE);
}
__builtin_unreachable();
}
if (-1 == close(fds_child_to_parent[1]) || -1 == close(fds_parent_to_child[0])) abort();
*fd_child_to_parent_p = fds_child_to_parent[0];
*fd_parent_to_child_p = fds_parent_to_child[1];
return pid;
}
static void * upstream(void * v) {
int fd_parent_to_child = *((int *)v);
FILE * fh_parent_to_child = fdopen(fd_parent_to_child, "w");
/* print some stuff into child's stdin */
fprintf(fh_parent_to_child, "hello\n");
fprintf(fh_parent_to_child, "also greetings\n");
/* this is necessary otherwise the child process will sit waiting for more input forever */
fclose(fh_parent_to_child);
return NULL;
}
int main(void) {
int fd_parent_to_child, fd_child_to_parent;
pid_t pid = popen_bidirectional(&fd_child_to_parent, &fd_parent_to_child,
"awk", (char *[]){ "awk",
"{ printf \"re: \\\"\%s\\\", hi\\n\", $0 } END { printf \"bye\\n\" }", NULL });
pthread_t pth;
if (-1 == pthread_create(&pth, NULL, upstream, &fd_parent_to_child)) abort();
FILE * fh_child_to_parent = fdopen(fd_child_to_parent, "r");
/* and read lines from child's stdout until eof */
char * line = NULL;
size_t linecap = 0;
while (getline(&line, &linecap, fh_child_to_parent) > 0) {
char * newline = strchr(line, '\n');
if (newline) *newline = '\0';
printf("awk says: %s\n", line);
}
free(line);
if (-1 == pthread_join(pth, NULL)) abort();
if (-1 == waitpid(pid, NULL, 0)) abort();
if (-1 == fclose(fh_child_to_parent)) abort();
}
#!/usr/bin/env python3
# demo of a three thread generator-style pipeline where the middle thread is a child process, with
# which communication happens via its stdin and stdout, allowing for cross-process, cross-language
# bidirectional ipc satisfying structured concurrency semantics, when zero-copy ipc is not needed.
# (if zero-copy ipc IS needed, this can be combined with another technique shown elsewhere...)
# once again you don't get credit for pointing out this was obviously written by a c programmer
import os
import sys
import threading
import subprocess
def upstream(fd_parent_to_child):
# this is the "grandchild" thread, which yields input to the child process's stdin
fh_parent_to_child = os.fdopen(fd_parent_to_child, 'w')
fh_parent_to_child.write('hello\n');
fh_parent_to_child.write('also greetings\n');
# don't rely on gc to close this after it goes out of scope, even though in cpython it does lol
fh_parent_to_child.close()
def main():
# start the child process
args = ['awk', 'BEGIN { printf "hello to stderr\\n" > "/dev/stderr" } { printf "re: \\"%s\\", hi\\n", $0 } END { printf "bye\\n" }']
child = subprocess.Popen(args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# start the grandchild thread within the parent process
pth = threading.Thread(target=upstream, args=(child.stdin.fileno(),))
pth.start()
# read lines from the child until it yields eof
for line in os.fdopen(child.stdout.fileno(), 'r'):
print('%s' % line.strip(), file=sys.stderr);
# cleanup
pth.join()
child.wait()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment