Skip to content

Instantly share code, notes, and snippets.

@genadyp
Last active February 3, 2026 07:02
Show Gist options
  • Select an option

  • Save genadyp/5b14f140ca9b74336ebbb987b883e5f2 to your computer and use it in GitHub Desktop.

Select an option

Save genadyp/5b14f140ca9b74336ebbb987b883e5f2 to your computer and use it in GitHub Desktop.
python useful links
@genadyp
Copy link
Author

genadyp commented Apr 17, 2023

Getting realtime output using subprocess

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def run(command):
    process = await create_subprocess_exec(
        *command, stdout=PIPE, stderr=PIPE
    )

    await asyncio.wait(
        [
            _read_stream(
                process.stdout,
                lambda x: print(
                    "STDOUT: {}".format(x.decode("UTF8"))
                ),
            ),
            _read_stream(
                process.stderr,
                lambda x: print(
                    "STDERR: {}".format(x.decode("UTF8"))
                ),
            ),
        ]
    )

    await process.wait()


async def main():
    await run("docker build -t my-docker-image:latest .")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

@genadyp
Copy link
Author

genadyp commented Sep 12, 2023

How to launch independent subprocess in Python

https://stackoverflow.com/a/42499675

#!/usr/bin/env python
import os
import sys
import platform
from subprocess import Popen, PIPE

# set system/version dependent "start_new_session" analogs
kwargs = {}
if platform.system() == 'Windows':
    # from msdn [1]
    CREATE_NEW_PROCESS_GROUP = 0x00000200  # note: could get it from subprocess
    DETACHED_PROCESS = 0x00000008          # 0x8 | 0x200 == 0x208
    kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP, close_fds=True)  
elif sys.version_info < (3, 2):  # assume posix
    kwargs.update(preexec_fn=os.setsid)
else:  # Python 3.2+ and Unix
    kwargs.update(start_new_session=True)

p = Popen(["C"], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
assert not p.poll()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment