Created
June 29, 2025 09:19
-
-
Save BexTuychiev/ad725e4b18411de89f49100b0247f338 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import streamlit as st | |
| import requests | |
| import json | |
| import uuid | |
| from typing import Iterator, Dict, Any | |
| def setup_streamlit_page(): | |
| st.set_page_config( | |
| page_title="LangFlow Chat Interface", page_icon="π€", layout="wide" | |
| ) | |
| st.title("π€ LangFlow Chat Interface") | |
| def initialize_session_state(): | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "session_id" not in st.session_state: | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| if "flow_config" not in st.session_state: | |
| st.session_state.flow_config = { | |
| "url": "http://localhost:7868/api/v1/run/bdb3c53c-3529-4793-8eb6-9bc9fb41127d", | |
| "stream": True, | |
| } | |
| def show_sidebar(): | |
| with st.sidebar: | |
| st.header("π§ Configuration") | |
| # Flow URL input | |
| flow_url = st.text_input( | |
| "Flow URL", | |
| value=st.session_state.flow_config["url"], | |
| help="Your LangFlow API endpoint URL", | |
| ) | |
| # Streaming toggle | |
| enable_streaming = st.checkbox( | |
| "Enable Streaming", value=st.session_state.flow_config["stream"] | |
| ) | |
| # Update session state | |
| st.session_state.flow_config = {"url": flow_url, "stream": enable_streaming} | |
| st.divider() | |
| # Clear chat button | |
| if st.button("ποΈ Clear Chat History", use_container_width=True): | |
| st.session_state.messages = [] | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| st.rerun() | |
| def stream_langflow_response(url: str, payload: Dict[str, Any]) -> Iterator[str]: | |
| """Stream response from LangFlow API""" | |
| stream_url = f"{url}?stream=true" | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post( | |
| stream_url, json=payload, headers=headers, stream=True, timeout=60 | |
| ) | |
| response.raise_for_status() | |
| previous_text = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| try: | |
| event_data = json.loads(line.decode("utf-8")) | |
| if event_data.get("event") == "add_message": | |
| # Only process messages from the assistant, not user echoes | |
| if event_data["data"]["sender"] == "Machine": | |
| current_text = event_data["data"]["text"] | |
| if len(current_text) > len(previous_text): | |
| new_chunk = current_text[len(previous_text) :] | |
| previous_text = current_text | |
| yield new_chunk | |
| elif event_data.get("event") == "end": | |
| return | |
| except json.JSONDecodeError: | |
| continue | |
| except requests.exceptions.RequestException as e: | |
| yield f"Error: {str(e)}" | |
| def make_langflow_request(url: str, payload: Dict[str, Any]) -> str: | |
| """Make non-streaming request to LangFlow API""" | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(url, json=payload, headers=headers, timeout=60) | |
| response.raise_for_status() | |
| result = response.json() | |
| # Extract message from LangFlow response structure | |
| if "outputs" in result and result["outputs"]: | |
| output = result["outputs"][0] | |
| if "outputs" in output and output["outputs"]: | |
| output_data = output["outputs"][0] | |
| if "results" in output_data and "message" in output_data["results"]: | |
| return output_data["results"]["message"]["text"] | |
| return "No response received" | |
| except requests.exceptions.RequestException as e: | |
| return f"Error: {str(e)}" | |
| def handle_user_input(user_message: str): | |
| # Add user message to history | |
| st.session_state.messages.append({"role": "user", "content": user_message}) | |
| # Display user message | |
| with st.chat_message("user"): | |
| st.markdown(user_message) | |
| # Prepare API payload | |
| payload = { | |
| "input_value": user_message, | |
| "output_type": "chat", | |
| "input_type": "chat", | |
| "session_id": st.session_state.session_id, | |
| } | |
| # Display assistant response with streaming | |
| with st.chat_message("assistant"): | |
| if st.session_state.flow_config["stream"]: | |
| response_placeholder = st.empty() | |
| full_response = "" | |
| for chunk in stream_langflow_response( | |
| st.session_state.flow_config["url"], payload | |
| ): | |
| full_response += chunk | |
| response_placeholder.markdown(full_response + "β") | |
| response_placeholder.markdown(full_response) | |
| else: | |
| # Handle non-streaming response | |
| response = make_langflow_request( | |
| st.session_state.flow_config["url"], payload | |
| ) | |
| st.markdown(response) | |
| full_response = response | |
| # Add assistant response to history | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| def main(): | |
| setup_streamlit_page() | |
| initialize_session_state() | |
| show_sidebar() | |
| # Display conversation history | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| # Chat input | |
| if user_input := st.chat_input("Type your message here..."): | |
| handle_user_input(user_input) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment