Skip to content

Instantly share code, notes, and snippets.

@schaumb
Last active October 23, 2024 15:25
Show Gist options
  • Save schaumb/037f139035d93cff3ad9f4f7e5f739ce to your computer and use it in GitHub Desktop.
Save schaumb/037f139035d93cff3ad9f4f7e5f739ce to your computer and use it in GitHub Desktop.
streamlit redirect
import streamlit as st
import io
import contextlib
import sys
import re
import threading
class _Redirect:
class IOStuff(io.StringIO):
def __init__(self, trigger, max_buffer, buffer_separator, regex, dup, need_dup, on_thread):
super().__init__()
self._trigger = trigger
self._max_buffer = max_buffer
self._buffer_separator = buffer_separator
self._regex = regex and re.compile(regex)
self._dup = dup
self._need_dup = need_dup
self._on_thread = on_thread
def write(self, __s: str) -> int:
res = None
if self._on_thread == threading.get_ident():
if self._max_buffer:
concatenated_len = super().tell() + len(__s)
if concatenated_len > self._max_buffer:
rest = self.get_filtered_output()[concatenated_len - self._max_buffer:]
if self._buffer_separator is not None:
rest = rest.split(self._buffer_separator, 1)[-1]
super().seek(0)
super().write(rest)
super().truncate(super().tell() + len(__s))
res = super().write(__s)
self._trigger(self.get_filtered_output())
if self._on_thread != threading.get_ident() or self._need_dup:
self._dup.write(__s)
return res
def get_filtered_output(self):
if self._regex is None or self._buffer_separator is None:
return self.getvalue()
return self._buffer_separator.join(filter(self._regex.search, self.getvalue().split(self._buffer_separator)))
def print_at_end(self):
self._trigger(self.get_filtered_output())
def __init__(self, stdout=None, stderr=False, format=None, to=None, max_buffer=None, buffer_separator='\n',
regex=None, duplicate_out=False):
self.io_args = {'trigger': self._write, 'max_buffer': max_buffer, 'buffer_separator': buffer_separator,
'regex': regex, 'on_thread': threading.get_ident()}
self.redirections = []
self.st = None
self.stderr = stderr is True
self.stdout = stdout is True or (stdout is None and not self.stderr)
self.format = format or 'code'
self.to = to
self.fun = None
self.duplicate_out = duplicate_out or None
self.active_nested = None
if not self.stdout and not self.stderr:
raise ValueError("one of stdout or stderr must be True")
if self.format not in ['text', 'markdown', 'latex', 'code', 'write']:
raise ValueError(
f"format need oneof the following: {', '.join(['text', 'markdown', 'latex', 'code', 'write'])}")
if self.to and (not hasattr(self.to, 'text') or not hasattr(self.to, 'empty')):
raise ValueError(f"'to' is not a streamlit container object")
def __enter__(self):
if self.st is not None:
if self.to is None:
if self.active_nested is None:
self.active_nested = self(format=self.format, max_buffer=self.io_args['max_buffer'],
buffer_separator=self.io_args['buffer_separator'],
regex=self.io_args['regex'], duplicate_out=self.duplicate_out)
return self.active_nested.__enter__()
else:
raise Exception("Already entered")
to = self.to or st
to.text(f"Redirected output from "
f"{'stdout and stderr' if self.stdout and self.stderr else 'stdout' if self.stdout else 'stderr'}"
f"{' [' + self.io_args['regex'] + ']' if self.io_args['regex'] else ''}"
f":")
self.st = to.empty()
self.fun = getattr(self.st, self.format)
io_obj = None
def redirect(to_duplicate, context_redirect):
nonlocal io_obj
io_obj = _Redirect.IOStuff(need_dup=self.duplicate_out and True, dup=to_duplicate, **self.io_args)
redirection = context_redirect(io_obj)
self.redirections.append((redirection, io_obj))
redirection.__enter__()
if self.stderr:
redirect(sys.stderr, contextlib.redirect_stderr)
if self.stdout:
redirect(sys.stdout, contextlib.redirect_stdout)
return io_obj
def __call__(self, to=None, format=None, max_buffer=None, buffer_separator='\n', regex=None, duplicate_out=False):
return _Redirect(self.stdout, self.stderr, format=format, to=to, max_buffer=max_buffer,
buffer_separator=buffer_separator, regex=regex, duplicate_out=duplicate_out)
def __exit__(self, *exc):
if self.active_nested is not None:
nested = self.active_nested
if nested.active_nested is None:
self.active_nested = None
return nested.__exit__(*exc)
res = None
for redirection, io_obj in reversed(self.redirections):
res = redirection.__exit__(*exc)
io_obj.print_at_end()
self.redirections = []
self.st = None
self.fun = None
return res
def _write(self, data):
self.fun(data)
stdout = _Redirect()
stderr = _Redirect(stderr=True)
stdouterr = _Redirect(stdout=True, stderr=True)
'''
# can be used as
import time
import sys
from random import getrandbits
import streamlit.redirect as rd
st.text('Suboutput:')
so = st.empty()
with rd.stdout, rd.stderr(format='markdown', to=st.sidebar):
print("hello ")
time.sleep(1)
i = 5
while i > 0:
print("**M**izu? ", file=sys.stdout if getrandbits(1) else sys.stderr)
i -= 1
with rd.stdout(to=so):
print(f" cica {i}")
if i:
time.sleep(1)
# '''
@SidJain1412
Copy link

Here's an example @schaumb

import time
import streamlit as st
import redirect as rd
import random


possible = ["1234", "5678", "wxyz", "#*%^", "abcd", "efgh", "ijkl"]

def streaming_function():
    s = random.choice(possible)
    for i in range(len(s)):
        print(s[i])
        time.sleep(0.4)


st.text('Suboutput:')
so = st.empty()
with rd.stdout(to=so):
    streaming_function()

If you run this using streamlit run ... in multiple tabs simultaneously, you'll see these two things happen

  • Outputs getting mixed up
  • Type errors

@schaumb
Copy link
Author

schaumb commented Nov 8, 2023

Thanks @SidJain1412 .

I modified the gist with the following change: From now only the same thread's stdout is redirected, others are passed towards. This solves the multi-session problem.

@schaumb
Copy link
Author

schaumb commented Nov 9, 2023

Other problems/features

  • The subthread logs are not sent to streamlit (previously it showed on the browser)
  • When 2 tabs concurrently redirect the stdout/err but do not finish in the opposite order, the redirections are mixed

@SidJain1412
Copy link

Thanks @SidJain1412 .

I modified the gist with the following change: From now only the same thread's stdout is redirected, others are passed towards. This solves the multi-session problem.

Awesome! Thanks a lot :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment