Created
July 2, 2024 09:33
-
-
Save bhuiyanmobasshir94/ee1019863eb756aa89c59b14eda69818 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, Request, HTTPException | |
from pydantic import BaseModel | |
import json | |
import requests | |
app = FastAPI() | |
PAGE_ACCESS_TOKEN = "EAAF9uJKilGYBO3t81bXI6n2BpXZBbj1JoWbzyZCbcwqdtigTHAZCiUSHowMcTUETT2ZCgZAkyrJVIqMWS2ZBm7ZAnp6ihB31oZCGn2Vc9kBIKLlYg7c2uMQZC6mzsiFBYa9adX6lAfjWERhSDpNPf49potdyEDZAk4qtwZCDdCovA89sYPt2Hv9FYwXQpRHZCCpZAOIOYrdfSA56iiAZDZD" | |
VERIFY_TOKEN = "ABCDE" | |
class WebhookRequest(BaseModel): | |
object: str | |
entry: list | |
@app.get("/webhook") | |
async def verify_token(q: Request): | |
verify_token = q.query_params["hub.verify_token"] | |
challenge = q.query_params["hub.challenge"] | |
if verify_token == VERIFY_TOKEN: | |
return int(challenge) | |
else: | |
raise HTTPException(status_code=403, detail="Forbidden") | |
@app.post("/webhook") | |
async def handle_messages(request: WebhookRequest): | |
for entry in request.entry: | |
for message_event in entry.get("messaging", []): | |
sender_id = message_event.get("sender", {}).get("id") | |
recipient_id = message_event.get("recipient", {}).get("id") | |
message_text = message_event.get("message", {}).get("text") | |
if message_text: | |
print(f"Received message from {sender_id}: {message_text}") | |
response_message = f"You messaged: {message_text}" | |
send_message(sender_id, response_message) | |
return {"status": "success"} | |
def send_message(recipient_id, message_text): | |
url = "https://graph.facebook.com/v12.0/me/messages" | |
headers = {"Content-Type": "application/json"} | |
data = {"recipient": {"id": recipient_id}, "message": {"text": message_text}} | |
params = {"access_token": PAGE_ACCESS_TOKEN} | |
response = requests.post(url, headers=headers, params=params, json=data) | |
if response.status_code != 200: | |
print(f"Failed to send message: {response.status_code}, {response.text}") | |
return response.json() | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |
# uvicorn main:app --host 0.0.0.0 --port 8000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment