Created
June 24, 2025 04:00
-
-
Save jongan69/7273eeca6faa85a933bd1d429d9a71a3 to your computer and use it in GitHub Desktop.
Damn you Google, damn you
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel, Field | |
from typing import List, Optional, Dict, Any | |
from fast_hotels import HotelData, Guests, get_hotels | |
from fast_flights import FlightData, Passengers, get_flights | |
import logging | |
import re | |
import pandas as pd | |
# Helper functions for parsing price and stops | |
def _parse_price(price): | |
if isinstance(price, str): | |
# Remove any non-numeric characters except dot and comma | |
match = re.search(r"[\d,.]+", price) | |
if match: | |
# Remove commas and convert to float | |
return float(match.group(0).replace(",", "")) | |
return None | |
return price | |
def _parse_stops(stops): | |
try: | |
if stops is None: | |
return None | |
if isinstance(stops, int): | |
return stops | |
# Try to convert to int if it's a string representation of a number | |
return int(stops) | |
except (ValueError, TypeError): | |
return None | |
app = FastAPI() | |
logging.basicConfig(level=logging.INFO) | |
# Hotel search models | |
class HotelSearchRequest(BaseModel): | |
checkin_date: str | |
checkout_date: str | |
location: str | |
adults: int = Field(..., ge=1) | |
children: int = Field(0, ge=0) | |
fetch_mode: str = Field("live", description="'live' for scraping, 'local' for mock data") | |
limit: int = Field(3, ge=1) | |
debug: bool = False | |
class HotelInfo(BaseModel): | |
name: str | |
price: Optional[float] | |
rating: Optional[float] | |
url: Optional[str] | |
amenities: Optional[List[str]] | |
class HotelSearchResponse(BaseModel): | |
hotels: List[HotelInfo] | |
lowest_price: Optional[float] | |
current_price: Optional[float] | |
# Flight search models | |
class FlightSearchRequest(BaseModel): | |
date: str | |
from_airport: str | |
to_airport: str | |
trip: str = Field("one-way", description="'one-way' or 'round-trip'") | |
seat: str = Field("economy", description="'economy', 'business', etc.") | |
adults: int = Field(..., ge=1) | |
children: int = Field(0, ge=0) | |
infants_in_seat: int = Field(0, ge=0) | |
infants_on_lap: int = Field(0, ge=0) | |
fetch_mode: str = Field("fallback", description="'fallback', 'live', etc.") | |
class FlightInfo(BaseModel): | |
name: str | |
departure: str | |
arrival: str | |
arrival_time_ahead: Optional[str] | |
duration: Optional[str] | |
stops: Optional[int] | |
delay: Optional[str] | |
price: Optional[float] | |
is_best: Optional[bool] | |
class FlightSearchResponse(BaseModel): | |
flights: List[FlightInfo] | |
current_price: Optional[str] | |
class HotelPreferences(BaseModel): | |
star_rating: Optional[int] = None | |
max_price_per_night: Optional[float] = None | |
amenities: Optional[list[str]] = None | |
class TripPlanRequest(BaseModel): | |
origin: str | |
destination: str | |
depart_date: str | |
return_date: str | |
adults: int = Field(..., ge=1) | |
children: int = Field(0, ge=0) | |
hotel_preferences: Optional[HotelPreferences] = None | |
max_total_budget: Optional[float] = None | |
class TripPlanResponse(BaseModel): | |
best_flight: Optional[FlightInfo] | |
best_hotel: Optional[HotelInfo] | |
total_estimated_cost: Optional[float] | |
per_person_per_day: Optional[float] | |
breakdown: Dict[str, Any] | |
suggestions: Optional[str] | |
@app.post("/hotels/search", response_model=HotelSearchResponse) | |
def search_hotels(req: HotelSearchRequest): | |
try: | |
hotel_data = [HotelData( | |
checkin_date=req.checkin_date, | |
checkout_date=req.checkout_date, | |
location=req.location | |
)] | |
guests = Guests(adults=req.adults, children=req.children) | |
result = get_hotels( | |
hotel_data=hotel_data, | |
guests=guests, | |
fetch_mode=req.fetch_mode, | |
debug=req.debug, | |
limit=req.limit | |
) | |
hotels = [HotelInfo( | |
name=h.name, | |
price=getattr(h, 'price', None), | |
rating=getattr(h, 'rating', None), | |
url=getattr(h, 'url', None), | |
amenities=getattr(h, 'amenities', None) | |
) for h in result.hotels] | |
return HotelSearchResponse( | |
hotels=hotels, | |
lowest_price=getattr(result, 'lowest_price', None), | |
current_price=getattr(result, 'current_price', None) | |
) | |
except Exception as e: | |
logging.error(f"Hotel search error: {e}") | |
raise HTTPException(status_code=400, detail=str(e)) | |
@app.post("/flights/search", response_model=FlightSearchResponse) | |
def search_flights(req: FlightSearchRequest): | |
try: | |
flight_data = [FlightData( | |
date=req.date, | |
from_airport=req.from_airport, | |
to_airport=req.to_airport | |
)] | |
passengers = Passengers( | |
adults=req.adults, | |
children=req.children, | |
infants_in_seat=req.infants_in_seat, | |
infants_on_lap=req.infants_on_lap | |
) | |
result = get_flights( | |
flight_data=flight_data, | |
trip=req.trip, | |
seat=req.seat, | |
passengers=passengers, | |
fetch_mode=req.fetch_mode | |
) | |
flights = [FlightInfo( | |
name=f.name, | |
departure=f.departure, | |
arrival=f.arrival, | |
arrival_time_ahead=getattr(f, 'arrival_time_ahead', None), | |
duration=getattr(f, 'duration', None), | |
stops=_parse_stops(getattr(f, 'stops', None)), | |
delay=getattr(f, 'delay', None), | |
price=_parse_price(getattr(f, 'price', None)), | |
is_best=getattr(f, 'is_best', None) | |
) for f in result.flights] | |
return FlightSearchResponse( | |
flights=flights, | |
current_price=getattr(result, 'current_price', None) | |
) | |
except Exception as e: | |
logging.error(f"Flight search error: {e}") | |
raise HTTPException(status_code=400, detail=str(e)) | |
@app.post("/trip/plan", response_model=TripPlanResponse) | |
def plan_trip(req: TripPlanRequest): | |
try: | |
# Search for flights | |
try: | |
flight_data = [FlightData( | |
date=req.depart_date, | |
from_airport=req.origin, | |
to_airport=req.destination | |
)] | |
passengers = Passengers( | |
adults=req.adults, | |
children=req.children, | |
infants_in_seat=0, | |
infants_on_lap=0 | |
) | |
flight_result = get_flights( | |
flight_data=flight_data, | |
trip="one-way", | |
seat="economy", | |
passengers=passengers, | |
fetch_mode="local" | |
) | |
flights = [ | |
FlightInfo( | |
name=f.name, | |
departure=f.departure, | |
arrival=f.arrival, | |
arrival_time_ahead=getattr(f, 'arrival_time_ahead', None), | |
duration=getattr(f, 'duration', None), | |
stops=_parse_stops(getattr(f, 'stops', None)), | |
delay=getattr(f, 'delay', None), | |
price=_parse_price(getattr(f, 'price', None)), | |
is_best=getattr(f, 'is_best', None) | |
) for f in flight_result.flights | |
] | |
best_flight = min((f for f in flights if f.price is not None), key=lambda x: x.price, default=None) | |
except Exception as e: | |
logging.error(f"Flight search error in /trip/plan: {e}") | |
raise HTTPException(status_code=502, detail=f"Flight search failed: {e}") | |
# Search for hotels | |
try: | |
hotel_data = [HotelData( | |
checkin_date=req.depart_date, | |
checkout_date=req.return_date, | |
location=req.destination | |
)] | |
guests = Guests(adults=req.adults, children=req.children) | |
hotel_result = get_hotels( | |
hotel_data=hotel_data, | |
guests=guests, | |
fetch_mode="live", | |
debug=False, | |
limit=10 | |
) | |
hotels = [ | |
HotelInfo( | |
name=h.name, | |
price=getattr(h, 'price', None), | |
rating=getattr(h, 'rating', None), | |
url=getattr(h, 'url', None), | |
amenities=getattr(h, 'amenities', None) | |
) for h in hotel_result.hotels | |
] | |
# Filter hotels by preferences | |
filtered_hotels = hotels | |
if req.hotel_preferences: | |
if req.hotel_preferences.star_rating: | |
filtered_hotels = [h for h in filtered_hotels if h.rating and h.rating >= req.hotel_preferences.star_rating] | |
if req.hotel_preferences.max_price_per_night: | |
filtered_hotels = [h for h in filtered_hotels if h.price and h.price <= req.hotel_preferences.max_price_per_night] | |
if req.hotel_preferences.amenities: | |
filtered_hotels = [h for h in filtered_hotels if h.amenities and all(a in h.amenities for a in req.hotel_preferences.amenities)] | |
best_hotel = min((h for h in filtered_hotels if h.price is not None), key=lambda x: x.price, default=None) | |
except Exception as e: | |
logging.error(f"Hotel search error in /trip/plan: {e}") | |
raise HTTPException(status_code=502, detail=f"Hotel search failed: {e}") | |
# Calculate total cost | |
total_flight_cost = best_flight.price * (req.adults + req.children) if best_flight and best_flight.price else 0 | |
nights = (pd.to_datetime(req.return_date) - pd.to_datetime(req.depart_date)).days | |
total_hotel_cost = (best_hotel.price * nights) if best_hotel and best_hotel.price else 0 | |
total_estimated_cost = total_flight_cost + total_hotel_cost | |
per_person_per_day = total_estimated_cost / ((req.adults + req.children) * nights) if nights > 0 and (req.adults + req.children) > 0 else None | |
breakdown = { | |
"flight": total_flight_cost, | |
"hotel": total_hotel_cost, | |
"nights": nights, | |
"adults": req.adults, | |
"children": req.children | |
} | |
suggestions = None | |
if req.max_total_budget and total_estimated_cost > req.max_total_budget: | |
suggestions = "Consider adjusting your dates, reducing hotel star rating, or increasing your budget." | |
return TripPlanResponse( | |
best_flight=best_flight, | |
best_hotel=best_hotel, | |
total_estimated_cost=total_estimated_cost, | |
per_person_per_day=per_person_per_day, | |
breakdown=breakdown, | |
suggestions=suggestions | |
) | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
logging.error(f"Trip plan error: {e}") | |
raise HTTPException(status_code=400, detail=str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment