|
from agents import Agent, Runner, function_tool, WebSearchTool |
|
from typing_extensions import TypedDict, Any |
|
import requests |
|
from markdownify import markdownify as md |
|
from langchain_experimental.text_splitter import SemanticChunker |
|
from langchain_openai.embeddings import OpenAIEmbeddings |
|
from typing import Dict |
|
import whois |
|
import json |
|
from datetime import datetime |
|
|
|
class MarkdownRequest(TypedDict): |
|
url: str |
|
segment: int |
|
|
|
class MarkdownResponse(TypedDict): |
|
text: str |
|
segment: int |
|
segments: int |
|
|
|
@function_tool |
|
def say_to_user(message: str) -> None: |
|
""" |
|
Show intermediant message to user. |
|
|
|
Args: |
|
message: message to shown |
|
|
|
""" |
|
print(f"Model said: {message}") |
|
|
|
@function_tool |
|
def show_final_report(report: str): |
|
""" |
|
Show final report to user before stop |
|
|
|
Args: |
|
report: Full report text without any assumptions and requests for new data |
|
|
|
Example: |
|
## Risk Assessment: [Low/Medium/High] |
|
|
|
Example.com presents as a [risk level] client because [key reasons]. The domain was registered [timeframe] ago and [key observations about website]. Their registration IP is [consistency with website location]. |
|
|
|
## Detailed Findings |
|
| Step | Description | Result | Red Flag | |
|
|------|-------------|--------|----------| |
|
| 1 | Website Status | Operational | No | |
|
| 2 | Company Category | Flowe store | No | |
|
| 3 | Dummy Content | None found | No | |
|
| 4 | Button Functionality | Functional | No | |
|
| 5 | Privacy and ToS | Open correctly | No | |
|
| 6 | Social Media Links | Present. Facebook and Twitter | No | |
|
| 7 | Domain Registration Date | More than 6 months ago | No | |
|
| 8 | Physical Office Address | Some street, Some city, Some country | Yes | |
|
| 9 | VPN/Proxy Usage | VPN detected | Yes | |
|
| 10 | Country Consistency | Consistent with IP | No | |
|
| 11 | Linkedin search | No information found | Yes | |
|
|
|
""" |
|
|
|
print("========================= REPORT ======================") |
|
print(report) |
|
print("======================= END REPORT ======================") |
|
|
|
|
|
@function_tool |
|
def whois_lookup(domain: str) -> Dict: |
|
""" |
|
Whois tool to retrieve full whois information about domain |
|
|
|
Args: |
|
domain: Domain name ex: example.com |
|
|
|
""" |
|
try: |
|
return whois.whois(domain).__dict__ |
|
except Exception as e: |
|
return {"error": e} |
|
|
|
def vpnapi_tool(ip: str) -> Dict: |
|
try: |
|
response = requests.get(f"https://vpnapi.io/api/{ip}?key=INSERT_YOUR_KEY", timeout=10) |
|
return response.json() |
|
except Exception as e: |
|
return {"error": e} |
|
|
|
def whois_date_lookup(domain: str) -> str: |
|
try: |
|
return whois.whois(domain)['creation_date'] |
|
except Exception as e: |
|
return f"WHOIS error: {e}" |
|
|
|
def extract_domain(email: str) -> str: |
|
try: |
|
return email.split('@')[-1] |
|
except Exception as e: |
|
return f"Domain extract error: {e}" |
|
|
|
|
|
markdown_cache = {} |
|
text_splitter = SemanticChunker(OpenAIEmbeddings(), breakpoint_threshold_type='percentile', breakpoint_threshold_amount=90) # chose which embeddings and breakpoint type and threshold to use |
|
|
|
|
|
@function_tool |
|
def get_markdown(req: MarkdownRequest) -> MarkdownResponse: |
|
|
|
""" |
|
Read URL as markdown and response in segments |
|
|
|
Args: |
|
req: MarkdownRequest |
|
url: Url to be scrapped |
|
segment: Segment id to be read starts with 0 |
|
|
|
Response: |
|
MarkdownResponse: |
|
text: Text of the segment |
|
segment: segment id of file |
|
segments: total count of segments |
|
""" |
|
|
|
print(f"Get page {req['url']} -> {req['segment']}") |
|
|
|
if req['url'] in markdown_cache: |
|
print(f"Load markdown from cache for {req['url']}:{req['segment']}") |
|
MarkdownResponse(text=markdown_cache[req['url']][req['segment']].page_content, segment=req['segment'], segments=len(markdown_cache[req['url']])) |
|
return markdown_cache[req['url']][req['segment']] |
|
|
|
markdown_full = get_markdown_raw(req['url']) |
|
print(f"Splitting #{len(markdown_full)}") |
|
docs = text_splitter.create_documents([markdown_full]) |
|
print("Splitted") |
|
markdown_cache[req['url']] = docs |
|
print(f"Got {len(docs)} documents for {req['url']}") |
|
|
|
return MarkdownResponse(text=markdown_cache[req['url']][req['segment']].page_content, segment=req['segment'], segments=len(markdown_cache[req['url']])) |
|
|
|
|
|
|
|
def get_markdown_raw(url: str) -> str: |
|
""" |
|
Get markdown representation for page with given URL |
|
|
|
Args: |
|
url: URL of the page to get content |
|
""" |
|
print(f"DEBUG: GET MARKDOWN OF {url}") |
|
try: |
|
response = requests.get(url, timeout=10) |
|
return md(response.text) |
|
except Exception as e: |
|
return f"Exception: {e}" |
|
|
|
|
|
prompt = """ |
|
# Role |
|
|
|
You are compliance officer who checks new clients with email from which they registered. |
|
You always check for red flags in websites to avoid fraudsters. |
|
|
|
# Input format |
|
|
|
You have following input as json: |
|
|
|
{ |
|
"email": str // user registered email |
|
"domain": str // domain extracted from email |
|
"domain_regdate": str // domain registration date |
|
"vpnapi_response": { |
|
... |
|
security": { |
|
"vpn": bool, // Is IP is known VPN server |
|
"proxy": bool, //Is IP is known Proxy server |
|
"tor": bool, //Is IP is known TOR exit proxy |
|
"relay": bool //Is IP is known Relay |
|
}, |
|
"location": { |
|
"city": str // City |
|
"country": str // Country |
|
} |
|
} |
|
} |
|
|
|
# Workflow |
|
|
|
# Workflow |
|
|
|
1. Check if the website is operational and not parked by reading it content. |
|
** IMPORTANT: ** Stop if website is not operational. |
|
** IMPORTANT: ** Stop if website is well-known email service |
|
2. Extract company name |
|
3. Search for links to Privacy Policy and Abount in source code. |
|
4. Extract company category from website. |
|
5. Check for computer generated content. |
|
6. Check domain registration date. Mark as red flag if domain registered less than 6 months ago. |
|
7. Check if website contains office address (search in footer, on About page or Contacts page). |
|
8. Check country from vpnapi_response and website physicall address. |
|
|
|
|
|
## Page reading workflow |
|
|
|
Follow exactly this steps to read full document: |
|
|
|
1. Get markdown for url from chunk 0 and examine response. |
|
2. Read all document chunk by chunk until current chunk less than chunks count |
|
3. Repeat steps 1,2 |
|
|
|
# Task: |
|
|
|
**CRITICAL:** Do workflow precisely step by step for provided input. |
|
|
|
After every step evaluate results and write red flags if you found any. |
|
If you do not have any information output N/A for that step. |
|
Check risk of working with this customer for VoIP company. |
|
When you get answers to all questions write final report and stop. |
|
|
|
**IMPORTNAT:** After every step you write step results with tool say_to_user. |
|
|
|
# Using Tools |
|
|
|
When reporting intermediate results, use the `say_to_user` tool in this format: |
|
|
|
``` |
|
say_to_user(message="Step X: [Brief description of finding]") |
|
``` |
|
|
|
When reporting final report, use the `show_final_report` tool in this format: |
|
``` |
|
show_final_report(report="...") |
|
``` |
|
|
|
For internet search use search tool |
|
|
|
When you need whole content of a page with all links use, the `get_markdown` tool |
|
|
|
# Risk Assessment |
|
|
|
Based on your findings, classify the client into one of these risk categories: |
|
- **Low Risk** |
|
- **Medium Risk** |
|
- **High Risk** |
|
|
|
|
|
# Output Format |
|
|
|
1. Provide a one-paragraph summary of your risk assessment in markdown format. |
|
2. Include a table with results for each step: |
|
|
|
Example output: |
|
```md |
|
## Risk Assessment: [Low/Medium/High] |
|
|
|
Example.com presents as a [risk level] client because [key reasons]. The domain was registered [timeframe] ago and [key observations about website]. Their registration IP is [consistency with website location]. |
|
|
|
## Detailed Findings |
|
| Step | Description | Result | Red Flag | |
|
|------|-------------|--------|----------| |
|
| 1 | Website Status | Operational | No | |
|
| 2 | Company Category | Flowers store | No | |
|
| 3 | Generated Content | None found | No | |
|
| 4 | About and Contacts | Open correctly | No | |
|
| 5 | Domain Registration Date | More than 6 months ago | No | |
|
| 6 | Physical Office Address | Some street, Some city, Some country | Yes | |
|
| 7 | VPN/Proxy Usage | VPN detected | Yes | |
|
| 8 | Country Consistency | Consistent with IP | No | |
|
``` |
|
|
|
""" |
|
|
|
|
|
def main(): |
|
emails = [ |
|
{"email": "[email protected]", "ip": "160.79.104.10"}, |
|
] |
|
|
|
|
|
# Create the agent |
|
|
|
for user in emails: |
|
print(f"Start examining {user['email']}") |
|
try: |
|
|
|
domain = extract_domain(user["email"]) |
|
vpnapi_response = vpnapi_tool(user["ip"]) |
|
|
|
input = { |
|
"email": user["email"], |
|
"domain": domain, |
|
"domain_regdate": whois_date_lookup(domain), |
|
"vpnapi_response": vpnapi_response, |
|
"current_date": datetime.today().strftime('%Y-%m-%d') |
|
} |
|
|
|
|
|
agent = Agent(name="Investigator", instructions=prompt, tools=[say_to_user, get_markdown, whois_lookup, show_final_report, WebSearchTool()], model="gpt-4o-mini") |
|
Runner.run_sync(agent, json.dumps(input, default=str), max_turns=50) |
|
except Exception as e: |
|
print(f"Failed to execute examination: {e}") |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |