Skip to content

Instantly share code, notes, and snippets.

@jongan69
Created June 24, 2025 04:00
Show Gist options
  • Save jongan69/7273eeca6faa85a933bd1d429d9a71a3 to your computer and use it in GitHub Desktop.
Save jongan69/7273eeca6faa85a933bd1d429d9a71a3 to your computer and use it in GitHub Desktop.
Damn you Google, damn you
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from fast_hotels import HotelData, Guests, get_hotels
from fast_flights import FlightData, Passengers, get_flights
import logging
import re
import pandas as pd
# Helper functions for parsing price and stops
def _parse_price(price):
if isinstance(price, str):
# Remove any non-numeric characters except dot and comma
match = re.search(r"[\d,.]+", price)
if match:
# Remove commas and convert to float
return float(match.group(0).replace(",", ""))
return None
return price
def _parse_stops(stops):
try:
if stops is None:
return None
if isinstance(stops, int):
return stops
# Try to convert to int if it's a string representation of a number
return int(stops)
except (ValueError, TypeError):
return None
app = FastAPI()
logging.basicConfig(level=logging.INFO)
# Hotel search models
class HotelSearchRequest(BaseModel):
checkin_date: str
checkout_date: str
location: str
adults: int = Field(..., ge=1)
children: int = Field(0, ge=0)
fetch_mode: str = Field("live", description="'live' for scraping, 'local' for mock data")
limit: int = Field(3, ge=1)
debug: bool = False
class HotelInfo(BaseModel):
name: str
price: Optional[float]
rating: Optional[float]
url: Optional[str]
amenities: Optional[List[str]]
class HotelSearchResponse(BaseModel):
hotels: List[HotelInfo]
lowest_price: Optional[float]
current_price: Optional[float]
# Flight search models
class FlightSearchRequest(BaseModel):
date: str
from_airport: str
to_airport: str
trip: str = Field("one-way", description="'one-way' or 'round-trip'")
seat: str = Field("economy", description="'economy', 'business', etc.")
adults: int = Field(..., ge=1)
children: int = Field(0, ge=0)
infants_in_seat: int = Field(0, ge=0)
infants_on_lap: int = Field(0, ge=0)
fetch_mode: str = Field("fallback", description="'fallback', 'live', etc.")
class FlightInfo(BaseModel):
name: str
departure: str
arrival: str
arrival_time_ahead: Optional[str]
duration: Optional[str]
stops: Optional[int]
delay: Optional[str]
price: Optional[float]
is_best: Optional[bool]
class FlightSearchResponse(BaseModel):
flights: List[FlightInfo]
current_price: Optional[str]
class HotelPreferences(BaseModel):
star_rating: Optional[int] = None
max_price_per_night: Optional[float] = None
amenities: Optional[list[str]] = None
class TripPlanRequest(BaseModel):
origin: str
destination: str
depart_date: str
return_date: str
adults: int = Field(..., ge=1)
children: int = Field(0, ge=0)
hotel_preferences: Optional[HotelPreferences] = None
max_total_budget: Optional[float] = None
class TripPlanResponse(BaseModel):
best_flight: Optional[FlightInfo]
best_hotel: Optional[HotelInfo]
total_estimated_cost: Optional[float]
per_person_per_day: Optional[float]
breakdown: Dict[str, Any]
suggestions: Optional[str]
@app.post("/hotels/search", response_model=HotelSearchResponse)
def search_hotels(req: HotelSearchRequest):
try:
hotel_data = [HotelData(
checkin_date=req.checkin_date,
checkout_date=req.checkout_date,
location=req.location
)]
guests = Guests(adults=req.adults, children=req.children)
result = get_hotels(
hotel_data=hotel_data,
guests=guests,
fetch_mode=req.fetch_mode,
debug=req.debug,
limit=req.limit
)
hotels = [HotelInfo(
name=h.name,
price=getattr(h, 'price', None),
rating=getattr(h, 'rating', None),
url=getattr(h, 'url', None),
amenities=getattr(h, 'amenities', None)
) for h in result.hotels]
return HotelSearchResponse(
hotels=hotels,
lowest_price=getattr(result, 'lowest_price', None),
current_price=getattr(result, 'current_price', None)
)
except Exception as e:
logging.error(f"Hotel search error: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.post("/flights/search", response_model=FlightSearchResponse)
def search_flights(req: FlightSearchRequest):
try:
flight_data = [FlightData(
date=req.date,
from_airport=req.from_airport,
to_airport=req.to_airport
)]
passengers = Passengers(
adults=req.adults,
children=req.children,
infants_in_seat=req.infants_in_seat,
infants_on_lap=req.infants_on_lap
)
result = get_flights(
flight_data=flight_data,
trip=req.trip,
seat=req.seat,
passengers=passengers,
fetch_mode=req.fetch_mode
)
flights = [FlightInfo(
name=f.name,
departure=f.departure,
arrival=f.arrival,
arrival_time_ahead=getattr(f, 'arrival_time_ahead', None),
duration=getattr(f, 'duration', None),
stops=_parse_stops(getattr(f, 'stops', None)),
delay=getattr(f, 'delay', None),
price=_parse_price(getattr(f, 'price', None)),
is_best=getattr(f, 'is_best', None)
) for f in result.flights]
return FlightSearchResponse(
flights=flights,
current_price=getattr(result, 'current_price', None)
)
except Exception as e:
logging.error(f"Flight search error: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.post("/trip/plan", response_model=TripPlanResponse)
def plan_trip(req: TripPlanRequest):
try:
# Search for flights
try:
flight_data = [FlightData(
date=req.depart_date,
from_airport=req.origin,
to_airport=req.destination
)]
passengers = Passengers(
adults=req.adults,
children=req.children,
infants_in_seat=0,
infants_on_lap=0
)
flight_result = get_flights(
flight_data=flight_data,
trip="one-way",
seat="economy",
passengers=passengers,
fetch_mode="local"
)
flights = [
FlightInfo(
name=f.name,
departure=f.departure,
arrival=f.arrival,
arrival_time_ahead=getattr(f, 'arrival_time_ahead', None),
duration=getattr(f, 'duration', None),
stops=_parse_stops(getattr(f, 'stops', None)),
delay=getattr(f, 'delay', None),
price=_parse_price(getattr(f, 'price', None)),
is_best=getattr(f, 'is_best', None)
) for f in flight_result.flights
]
best_flight = min((f for f in flights if f.price is not None), key=lambda x: x.price, default=None)
except Exception as e:
logging.error(f"Flight search error in /trip/plan: {e}")
raise HTTPException(status_code=502, detail=f"Flight search failed: {e}")
# Search for hotels
try:
hotel_data = [HotelData(
checkin_date=req.depart_date,
checkout_date=req.return_date,
location=req.destination
)]
guests = Guests(adults=req.adults, children=req.children)
hotel_result = get_hotels(
hotel_data=hotel_data,
guests=guests,
fetch_mode="live",
debug=False,
limit=10
)
hotels = [
HotelInfo(
name=h.name,
price=getattr(h, 'price', None),
rating=getattr(h, 'rating', None),
url=getattr(h, 'url', None),
amenities=getattr(h, 'amenities', None)
) for h in hotel_result.hotels
]
# Filter hotels by preferences
filtered_hotels = hotels
if req.hotel_preferences:
if req.hotel_preferences.star_rating:
filtered_hotels = [h for h in filtered_hotels if h.rating and h.rating >= req.hotel_preferences.star_rating]
if req.hotel_preferences.max_price_per_night:
filtered_hotels = [h for h in filtered_hotels if h.price and h.price <= req.hotel_preferences.max_price_per_night]
if req.hotel_preferences.amenities:
filtered_hotels = [h for h in filtered_hotels if h.amenities and all(a in h.amenities for a in req.hotel_preferences.amenities)]
best_hotel = min((h for h in filtered_hotels if h.price is not None), key=lambda x: x.price, default=None)
except Exception as e:
logging.error(f"Hotel search error in /trip/plan: {e}")
raise HTTPException(status_code=502, detail=f"Hotel search failed: {e}")
# Calculate total cost
total_flight_cost = best_flight.price * (req.adults + req.children) if best_flight and best_flight.price else 0
nights = (pd.to_datetime(req.return_date) - pd.to_datetime(req.depart_date)).days
total_hotel_cost = (best_hotel.price * nights) if best_hotel and best_hotel.price else 0
total_estimated_cost = total_flight_cost + total_hotel_cost
per_person_per_day = total_estimated_cost / ((req.adults + req.children) * nights) if nights > 0 and (req.adults + req.children) > 0 else None
breakdown = {
"flight": total_flight_cost,
"hotel": total_hotel_cost,
"nights": nights,
"adults": req.adults,
"children": req.children
}
suggestions = None
if req.max_total_budget and total_estimated_cost > req.max_total_budget:
suggestions = "Consider adjusting your dates, reducing hotel star rating, or increasing your budget."
return TripPlanResponse(
best_flight=best_flight,
best_hotel=best_hotel,
total_estimated_cost=total_estimated_cost,
per_person_per_day=per_person_per_day,
breakdown=breakdown,
suggestions=suggestions
)
except HTTPException as e:
raise e
except Exception as e:
logging.error(f"Trip plan error: {e}")
raise HTTPException(status_code=400, detail=str(e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment