Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Last active August 11, 2025 19:44
Show Gist options
  • Save paulwinex/174ddb8f2900f25b6eed4d343d6e5187 to your computer and use it in GitHub Desktop.
Save paulwinex/174ddb8f2900f25b6eed4d343d6e5187 to your computer and use it in GitHub Desktop.
"""
Just run:
python3 custom-pipe-example.py
"""
import subprocess
import os
import json
import time
import sys
from pprint import pprint
def run_main_process():
read_fd, write_fd = os.pipe()
os.set_inheritable(write_fd, True)
process = subprocess.Popen(
[sys.executable, __file__, str(write_fd)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
close_fds=False
)
os.close(write_fd)
with os.fdopen(read_fd, 'r') as data_pipe:
try:
json_data = data_pipe.read()
if json_data:
received_json = json.loads(json_data)
print('DATA RECEIVED:')
pprint(received_json)
except json.JSONDecodeError as e:
print(f"JSON Decode error: {e}")
print(f"{json_data}")
stdout_log, stderr_log = process.communicate()
print("\nChild logs:")
print(stdout_log)
print("\nChild Errors:")
print(stderr_log)
def run_child_process(write_pipe_fd):
with os.fdopen(int(write_pipe_fd), 'w') as data_pipe:
data = {
"message": "Hello!!!",
"timestamp": time.time(),
"source": "custom_pipe_json"
}
json.dump(data, data_pipe)
data_pipe.flush()
print('SENT DATA:')
pprint(data)
if __name__ == '__main__':
args = sys.argv[1:]
if not args:
run_main_process()
else:
run_child_process(args[0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment