Skip to content

Instantly share code, notes, and snippets.

@schaumb
Last active October 23, 2024 15:25
Show Gist options
  • Save schaumb/037f139035d93cff3ad9f4f7e5f739ce to your computer and use it in GitHub Desktop.
Save schaumb/037f139035d93cff3ad9f4f7e5f739ce to your computer and use it in GitHub Desktop.
streamlit redirect
import streamlit as st
import io
import contextlib
import sys
import re
import threading
class _Redirect:
class IOStuff(io.StringIO):
def __init__(self, trigger, max_buffer, buffer_separator, regex, dup, need_dup, on_thread):
super().__init__()
self._trigger = trigger
self._max_buffer = max_buffer
self._buffer_separator = buffer_separator
self._regex = regex and re.compile(regex)
self._dup = dup
self._need_dup = need_dup
self._on_thread = on_thread
def write(self, __s: str) -> int:
res = None
if self._on_thread == threading.get_ident():
if self._max_buffer:
concatenated_len = super().tell() + len(__s)
if concatenated_len > self._max_buffer:
rest = self.get_filtered_output()[concatenated_len - self._max_buffer:]
if self._buffer_separator is not None:
rest = rest.split(self._buffer_separator, 1)[-1]
super().seek(0)
super().write(rest)
super().truncate(super().tell() + len(__s))
res = super().write(__s)
self._trigger(self.get_filtered_output())
if self._on_thread != threading.get_ident() or self._need_dup:
self._dup.write(__s)
return res
def get_filtered_output(self):
if self._regex is None or self._buffer_separator is None:
return self.getvalue()
return self._buffer_separator.join(filter(self._regex.search, self.getvalue().split(self._buffer_separator)))
def print_at_end(self):
self._trigger(self.get_filtered_output())
def __init__(self, stdout=None, stderr=False, format=None, to=None, max_buffer=None, buffer_separator='\n',
regex=None, duplicate_out=False):
self.io_args = {'trigger': self._write, 'max_buffer': max_buffer, 'buffer_separator': buffer_separator,
'regex': regex, 'on_thread': threading.get_ident()}
self.redirections = []
self.st = None
self.stderr = stderr is True
self.stdout = stdout is True or (stdout is None and not self.stderr)
self.format = format or 'code'
self.to = to
self.fun = None
self.duplicate_out = duplicate_out or None
self.active_nested = None
if not self.stdout and not self.stderr:
raise ValueError("one of stdout or stderr must be True")
if self.format not in ['text', 'markdown', 'latex', 'code', 'write']:
raise ValueError(
f"format need oneof the following: {', '.join(['text', 'markdown', 'latex', 'code', 'write'])}")
if self.to and (not hasattr(self.to, 'text') or not hasattr(self.to, 'empty')):
raise ValueError(f"'to' is not a streamlit container object")
def __enter__(self):
if self.st is not None:
if self.to is None:
if self.active_nested is None:
self.active_nested = self(format=self.format, max_buffer=self.io_args['max_buffer'],
buffer_separator=self.io_args['buffer_separator'],
regex=self.io_args['regex'], duplicate_out=self.duplicate_out)
return self.active_nested.__enter__()
else:
raise Exception("Already entered")
to = self.to or st
to.text(f"Redirected output from "
f"{'stdout and stderr' if self.stdout and self.stderr else 'stdout' if self.stdout else 'stderr'}"
f"{' [' + self.io_args['regex'] + ']' if self.io_args['regex'] else ''}"
f":")
self.st = to.empty()
self.fun = getattr(self.st, self.format)
io_obj = None
def redirect(to_duplicate, context_redirect):
nonlocal io_obj
io_obj = _Redirect.IOStuff(need_dup=self.duplicate_out and True, dup=to_duplicate, **self.io_args)
redirection = context_redirect(io_obj)
self.redirections.append((redirection, io_obj))
redirection.__enter__()
if self.stderr:
redirect(sys.stderr, contextlib.redirect_stderr)
if self.stdout:
redirect(sys.stdout, contextlib.redirect_stdout)
return io_obj
def __call__(self, to=None, format=None, max_buffer=None, buffer_separator='\n', regex=None, duplicate_out=False):
return _Redirect(self.stdout, self.stderr, format=format, to=to, max_buffer=max_buffer,
buffer_separator=buffer_separator, regex=regex, duplicate_out=duplicate_out)
def __exit__(self, *exc):
if self.active_nested is not None:
nested = self.active_nested
if nested.active_nested is None:
self.active_nested = None
return nested.__exit__(*exc)
res = None
for redirection, io_obj in reversed(self.redirections):
res = redirection.__exit__(*exc)
io_obj.print_at_end()
self.redirections = []
self.st = None
self.fun = None
return res
def _write(self, data):
self.fun(data)
stdout = _Redirect()
stderr = _Redirect(stderr=True)
stdouterr = _Redirect(stdout=True, stderr=True)
'''
# can be used as
import time
import sys
from random import getrandbits
import streamlit.redirect as rd
st.text('Suboutput:')
so = st.empty()
with rd.stdout, rd.stderr(format='markdown', to=st.sidebar):
print("hello ")
time.sleep(1)
i = 5
while i > 0:
print("**M**izu? ", file=sys.stdout if getrandbits(1) else sys.stderr)
i -= 1
with rd.stdout(to=so):
print(f" cica {i}")
if i:
time.sleep(1)
# '''
@tojo17
Copy link

tojo17 commented Oct 30, 2023

Hi @schaumb, thanks a lot for this script. But it seems not working properly for stderr.
If stderr is enabled after stdout, stdout will be actually redirected to where stderr is supposed to while stderr will not be displayed.
A simple fix may be possible by changeing Line 91:
from

        def redirect(to_duplicate):
            nonlocal io_obj
            io_obj = _Redirect.IOStuff(dup=self.duplicate_out and to_duplicate, **self.io_args)
            redirection = contextlib.redirect_stdout(io_obj)
            self.redirections.append((redirection, io_obj))
            redirection.__enter__()

to

        def redirect(to_duplicate):
            nonlocal io_obj
            io_obj = _Redirect.IOStuff(dup=self.duplicate_out and to_duplicate, **self.io_args)
            if to_duplicate == sys.stdout:
                redirection = contextlib.redirect_stdout(io_obj)
            else:
                redirection = contextlib.redirect_stderr(io_obj)
            self.redirections.append((redirection, io_obj))
            redirection.__enter__()

Sorry if I am wrong, and thank you again for your works!

@schaumb
Copy link
Author

schaumb commented Oct 31, 2023

Hi @schaumb, thanks a lot for this script. But it seems not working properly for stderr. If stderr is enabled after stdout, stdout will be actually redirected to where stderr is supposed to while stderr will not be displayed.

You are right @toshichi , this was a bug, and I fixed it. Thank you for your contribution!

@SidJain1412
Copy link

SidJain1412 commented Nov 1, 2023

@schaumb Very helpful gist! Thank you.
One question: If I use this across multiple streamlit sessions, it is possible that the output of one session goes to another right? Is there any way to overcome this?

Edit: I checked, and it does happen

@schaumb
Copy link
Author

schaumb commented Nov 2, 2023

Hi @SidJain1412 , can you create a small code where this bug can be reproduced?

@SidJain1412
Copy link

Here's an example @schaumb

import time
import streamlit as st
import redirect as rd
import random


possible = ["1234", "5678", "wxyz", "#*%^", "abcd", "efgh", "ijkl"]

def streaming_function():
    s = random.choice(possible)
    for i in range(len(s)):
        print(s[i])
        time.sleep(0.4)


st.text('Suboutput:')
so = st.empty()
with rd.stdout(to=so):
    streaming_function()

If you run this using streamlit run ... in multiple tabs simultaneously, you'll see these two things happen

  • Outputs getting mixed up
  • Type errors

@schaumb
Copy link
Author

schaumb commented Nov 8, 2023

Thanks @SidJain1412 .

I modified the gist with the following change: From now only the same thread's stdout is redirected, others are passed towards. This solves the multi-session problem.

@schaumb
Copy link
Author

schaumb commented Nov 9, 2023

Other problems/features

  • The subthread logs are not sent to streamlit (previously it showed on the browser)
  • When 2 tabs concurrently redirect the stdout/err but do not finish in the opposite order, the redirections are mixed

@SidJain1412
Copy link

Thanks @SidJain1412 .

I modified the gist with the following change: From now only the same thread's stdout is redirected, others are passed towards. This solves the multi-session problem.

Awesome! Thanks a lot :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment