Created
April 18, 2024 14:00
-
-
Save etrobot/223e2893953f326958b58c83c6622784 to your computer and use it in GitHub Desktop.
langgraph generates python script to control the existing chrome
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from langchain_openai import ChatOpenAI | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.pydantic_v1 import BaseModel, Field | |
import json,urllib.request,websockets,asyncio | |
from typing import TypedDict, List | |
import textwrap,html | |
from bs4 import BeautifulSoup, Tag, NavigableString,Comment | |
import os | |
from dotenv import load_dotenv,find_dotenv | |
load_dotenv(find_dotenv()) | |
def generate_simplified_dom(soup): | |
element = soup.body | |
# Handle text nodes | |
if isinstance(element, NavigableString): | |
return element if element.strip() else None | |
# Only handle tag nodes from here | |
if not isinstance(element, Tag): | |
return None | |
# Remove unwanted tags | |
for unwanted_tag in ['script', 'style', 'path', 'iframe', 'footer']: | |
for tag in element.findAll(unwanted_tag): | |
tag.decompose() | |
# Remove comments | |
for comment in element.findAll(string=lambda text: isinstance(text, Comment)): | |
comment.extract() | |
# Check visibility | |
is_visible = 'display: none' not in element.get('style', '') | |
# Generate simplified DOM for children | |
children = filter(None, (generate_simplified_dom(c) for c in element.children) if is_visible else []) | |
# Don't bother with text that is the direct child of the body | |
if element.name == 'body': | |
children = [c for c in children if not isinstance(c, NavigableString)] | |
# Check interactivity and labels | |
interactive = any(attr in element.attrs for attr in ('onclick', 'oninput', 'onchange')) | |
has_label = element.has_attr('aria-label') or element.has_attr('name') | |
is_link = element.name == 'a' | |
include_node = interactive or has_label or is_link | |
# Handle empty and single-child nodes | |
if not include_node: | |
children = list(children) | |
if not children: | |
return None | |
if len(children) == 1: | |
return children[0] | |
# Clone the node and append simplified children | |
container = BeautifulSoup('<{}>'.format(element.name), 'html.parser').find(element.name) | |
for attr in ('aria-label', 'data-name', 'name', 'type', 'placeholder', 'value', 'role', 'title', 'href'): | |
if element.has_attr(attr): | |
container[attr] = element[attr] | |
if interactive: | |
container['id'] = element.get('id', '') | |
for child in children: | |
container.append(child) | |
return html.unescape(textwrap.dedent(container.prettify())).replace('\n','') | |
concatenated_content = '' | |
example = ''' | |
async def open_url_in_new_tab_and_get_html(url): | |
debug_url = 'http://localhost:9222/json' | |
response = urllib.request.urlopen(debug_url).read().decode('utf-8') | |
ws_debug_url = json.loads(response)[0]['webSocketDebuggerUrl'] | |
async with websockets.connect(ws_debug_url) as ws: | |
create_target_message = json.dumps({"id": 1, "method": "Target.createTarget", "params": {"url": url}}) | |
await ws.send(create_target_message) | |
create_target_response = await ws.recv() | |
target_id = json.loads(create_target_response)['result']['targetId'] | |
attach_message = json.dumps({"id": 2, "method": "Target.attachToTarget", "params": {"targetId": target_id}}) | |
await ws.send(attach_message) | |
new_tab_debug_url = [tab['webSocketDebuggerUrl'] for tab in json.loads(urllib.request.urlopen(debug_url).read().decode('utf-8')) if | |
tab['id'] == target_id][0] | |
async with websockets.connect(new_tab_debug_url) as new_tab_ws: | |
time.sleep(5) # Wait for the page to load | |
get_html_message = json.dumps( | |
{"id": 1, "method": "Runtime.evaluate", "params": {"expression": "document.documentElement.outerHTML"}}) | |
await new_tab_ws.send(get_html_message) | |
html_response = await new_tab_ws.recv() | |
html_content = json.loads(html_response)["result"]["result"]["value"] | |
with open("domSaved.html", "w", encoding="utf-8") as file: | |
file.write(html_content) | |
return html | |
url2open="https://www.microsoft.com" | |
asyncio.run(open_url_in_new_tab_and_get_html(url2open))'''.replace('{','{{').replace('}','}}') | |
class GraphState(TypedDict): | |
""" | |
Represents the state of our graph. | |
Attributes: | |
error : Binary flag for control flow to indicate whether test error was tripped | |
messages : With user question, error messages, reasoning | |
generation : Code solution | |
iterations : Number of tries | |
""" | |
error : str | |
messages : List | |
generation : str | |
iterations : int | |
### OpenAI | |
# Grader prompt | |
code_gen_prompt = ChatPromptTemplate.from_messages( | |
[("system",example+"""You are a programmer who is trying to write python script to control a running browser with chrome devtool protocol via port 9222. \n | |
Modify the reference code above to answer the question from the user. | |
{context} | |
Structure your answer: 1) a prefix describing the code solution, 2) the imports, 3) the functioning code block. \n | |
Invoke the code tool to structure the output correctly. </instructions> \n Here is the user question::"""), | |
("placeholder", "{messages}")] | |
) | |
# Data model | |
class code(BaseModel): | |
"""Code output""" | |
prefix: str = Field(description="Description of the problem and approach") | |
imports: str = Field(description="Code block import statements") | |
code: str = Field(description="Code block not including import statements") | |
description = "Schema for code solutions to questions about cdp." | |
expt_llm = "gpt-3.5-turbo-0125" | |
llm = ChatOpenAI(temperature=0,model=expt_llm, api_key=os.environ['OPENAI_API_KEY'], | |
base_url=os.environ['OPENAI_BASE_URL']) | |
code_gen_chain = code_gen_prompt | llm.with_structured_output(code) | |
### Parameter | |
# Max tries | |
max_iterations = 3 | |
# Reflect | |
# flag = 'reflect' | |
flag = 'do not reflect' | |
### Nodes | |
def generate(state: GraphState): | |
""" | |
Generate a code solution | |
Args: | |
state (dict): The current graph state | |
Returns: | |
state (dict): New key added to state, generation | |
""" | |
print("---GENERATING CODE SOLUTION---") | |
# State | |
messages = state["messages"] | |
iterations = state["iterations"] | |
error = state["error"] | |
# We have been routed back to generation with an error | |
if error == "yes": | |
messages += [("user", | |
"Now, try again. Invoke the code tool to structure the output with a prefix, imports, and code block:")] | |
# Solution | |
code_solution = code_gen_chain.invoke({"context": concatenated_content, "messages": messages}) | |
messages += [ | |
("assistant", f"{code_solution.prefix} \n Imports: {code_solution.imports} \n Code: {code_solution.code}")] | |
# Increment | |
iterations = iterations + 1 | |
return {"generation": code_solution, "messages": messages, "iterations": iterations} | |
def code_check(state: GraphState): | |
""" | |
Check code | |
Args: | |
state (dict): The current graph state | |
Returns: | |
state (dict): New key added to state, error | |
""" | |
print("---CHECKING CODE---") | |
# State | |
messages = state["messages"] | |
code_solution = state["generation"] | |
iterations = state["iterations"] | |
# Get solution components | |
imports = code_solution.imports | |
code = code_solution.code | |
# Check imports | |
try: | |
exec(imports) | |
except Exception as e: | |
print("---CODE IMPORT CHECK: FAILED---") | |
error_message = [("user", f"Your solution failed the import test: {e}")] | |
messages += error_message | |
return {"generation": code_solution, "messages": messages, "iterations": iterations, "error": "yes"} | |
# Check execution | |
try: | |
print(imports + "\n" + code) | |
exec(imports + "\n" + code) | |
except Exception as e: | |
print("---CODE BLOCK CHECK: FAILED---") | |
error_message = [("user", f"Your solution failed the code execution test: {e}")] | |
messages += error_message | |
return {"generation": code_solution, "messages": messages, "iterations": iterations, "error": "yes"} | |
# No errors | |
print("---NO CODE TEST FAILURES---") | |
return {"generation": code_solution, "messages": messages, "iterations": iterations, "error": "no"} | |
def reflect(state: GraphState): | |
""" | |
Reflect on errors | |
Args: | |
state (dict): The current graph state | |
Returns: | |
state (dict): New key added to state, generation | |
""" | |
print("---GENERATING CODE SOLUTION---") | |
# State | |
messages = state["messages"] | |
iterations = state["iterations"] | |
code_solution = state["generation"] | |
# Prompt reflection | |
reflection_message = [("user", """You tried to solve this problem and failed a unit test. Reflect on this failure | |
given the provided documentation. Write a few key suggestions based on the | |
documentation to avoid making this mistake again.""")] | |
# Add reflection | |
reflections = code_gen_chain.invoke({"context": concatenated_content, "messages": messages}) | |
messages += [("assistant", f"Here are reflections on the error: {reflections}")] | |
return {"generation": code_solution, "messages": messages, "iterations": iterations} | |
### Edges | |
def decide_to_finish(state: GraphState): | |
""" | |
Determines whether to finish. | |
Args: | |
state (dict): The current graph state | |
Returns: | |
str: Next node to call | |
""" | |
error = state["error"] | |
iterations = state["iterations"] | |
if error == "no" or iterations == max_iterations: | |
print("---DECISION: FINISH---") | |
return "end" | |
else: | |
print("---DECISION: RE-TRY SOLUTION---") | |
if flag == 'reflect': | |
return "reflect" | |
else: | |
return "generate" | |
from langgraph.graph import END, StateGraph | |
workflow = StateGraph(GraphState) | |
# Define the nodes | |
workflow.add_node("generate", generate) # generation solution | |
workflow.add_node("check_code", code_check) # check code | |
workflow.add_node("reflect", reflect) # reflect | |
# Build graph | |
workflow.set_entry_point("generate") | |
workflow.add_edge("generate", "check_code") | |
workflow.add_conditional_edges( | |
"check_code", | |
decide_to_finish, | |
{ | |
"end": END, | |
"reflect": "reflect", | |
"generate": "generate", | |
}, | |
) | |
workflow.add_edge("reflect", "generate") | |
app = workflow.compile() | |
question = """打开https://twitter.com/i/lists/1733652180576686386""" | |
app.invoke({"messages":[("user",question)],"iterations":0}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment