Skip to content

Instantly share code, notes, and snippets.

@databyjp
Created October 26, 2025 13:01
Show Gist options
  • Select an option

  • Save databyjp/57f38cebb2dc9d2af8ccb454a2642a46 to your computer and use it in GitHub Desktop.

Select an option

Save databyjp/57f38cebb2dc9d2af8ccb454a2642a46 to your computer and use it in GitHub Desktop.
"""
End-to-End Weaviate Example: E-Commerce Product Collection
This script demonstrates:
1. Connecting to Weaviate Cloud
2. Creating a collection with RQ quantization and alias
3. Importing data with batching
4. Querying with the traditional Weaviate client
5. Querying with the Query Agent
"""
import os
import json
from typing import List, Dict, Any
import weaviate
from weaviate.classes.config import (
Configure,
Property,
DataType,
)
from weaviate.classes.init import Auth
from weaviate.classes.query import Filter
import dotenv
dotenv.load_dotenv(override=True)
# ============================================================================
# SETUP: Weaviate Cloud Connection
# ============================================================================
def connect_to_weaviate_cloud() -> weaviate.WeaviateClient:
"""
Connect to Weaviate Cloud instance.
Prerequisites:
- Set environment variables:
- WEAVIATE_URL: Your cluster URL (e.g., https://your-cluster.weaviate.cloud)
- WEAVIATE_API_KEY: Your admin API key
- OPENAI_API_KEY: For text2vec-openai vectorizer
"""
weaviate_url = os.environ.get("WEAVIATE_URL")
weaviate_api_key = os.environ.get("WEAVIATE_API_KEY")
openai_api_key = os.environ.get("OPENAI_API_KEY")
if not weaviate_url or not weaviate_api_key:
raise ValueError(
"Please set WEAVIATE_URL and WEAVIATE_API_KEY environment variables"
)
headers = {
"X-OpenAI-Api-Key": openai_api_key,
}
client = weaviate.connect_to_weaviate_cloud(
cluster_url=weaviate_url,
auth_credentials=Auth.api_key(weaviate_api_key),
headers=headers,
)
print("✓ Connected to Weaviate Cloud")
return client
# ============================================================================
# COLLECTION SETUP: Create E-Commerce Collection
# ============================================================================
def create_ecommerce_collection(client: weaviate.WeaviateClient) -> None:
"""
Create an e-commerce product collection with:
- 8-bit RQ quantization (recommended)
- Collection alias
- Proper schema with properties
"""
collection_name = "ECommerceProduct"
alias_name = "products"
# Delete if exists (for testing purposes)
try:
# Try to delete the alias - note: may already exist from previous runs
client.alias.delete(alias_name=alias_name)
print(f"Deleted existing alias: {alias_name}")
except Exception as e:
# Alias might not exist, which is fine
if "does not exist" not in str(e).lower():
pass # Other errors are also OK, we'll recreate it
if client.collections.exists(collection_name):
print(f"Deleting existing collection: {collection_name}")
client.collections.delete(collection_name)
# Create collection with RQ quantization
client.collections.create(
name=collection_name,
description="E-commerce product collection with 8-bit RQ quantization",
# Configure vector with 8-bit RQ quantization (recommended for most use cases)
vector_config=Configure.Vectors.text2vec_openai(
name="default",
source_properties=["name", "description"],
quantizer=Configure.VectorIndex.Quantizer.rq(bits=8),
),
properties=[
Property(
name="name",
data_type=DataType.TEXT,
description="Product name",
),
Property(
name="description",
data_type=DataType.TEXT,
description="Product description",
),
Property(
name="price",
data_type=DataType.NUMBER,
description="Product price",
),
Property(
name="category",
data_type=DataType.TEXT,
description="Product category",
),
Property(
name="inStock",
data_type=DataType.BOOL,
description="Whether product is in stock",
),
Property(
name="rating",
data_type=DataType.NUMBER,
description="Product rating (0-5)",
),
],
)
print(f"✓ Created collection: {collection_name}")
# Add alias to collection
try:
client.alias.create(alias_name=alias_name, target_collection=collection_name)
print(f"✓ Created alias: {alias_name} -> {collection_name}")
except Exception as e:
print(f"Note: Alias creation - {e}")
return collection_name
# ============================================================================
# DATA IMPORT: Sample E-Commerce Data with Batching
# ============================================================================
def generate_sample_products() -> List[Dict[str, Any]]:
"""Generate sample e-commerce product data"""
products = [
{
"name": "Wireless Noise-Canceling Headphones",
"description": "Premium quality headphones with active noise cancellation, 30-hour battery, Bluetooth 5.0",
"price": 299.99,
"category": "Electronics",
"inStock": True,
"rating": 4.8,
},
{
"name": "Stainless Steel Water Bottle",
"description": "Insulated water bottle keeps drinks cold for 24 hours or hot for 12 hours. Durable and eco-friendly",
"price": 34.99,
"category": "Sports",
"inStock": True,
"rating": 4.5,
},
{
"name": "Organic Cotton T-Shirt",
"description": "Comfortable 100% organic cotton t-shirt. Sustainable fashion, available in multiple colors",
"price": 24.99,
"category": "Apparel",
"inStock": True,
"rating": 4.3,
},
{
"name": "USB-C Fast Charging Cable",
"description": "High-speed USB-C cable, 6ft length, supports fast charging up to 100W, durable nylon braiding",
"price": 12.99,
"category": "Electronics",
"inStock": True,
"rating": 4.6,
},
{
"name": "Yoga Mat Non-Slip",
"description": "Premium 6mm thick yoga mat with non-slip surface. Perfect for home or gym workouts",
"price": 45.99,
"category": "Sports",
"inStock": True,
"rating": 4.7,
},
{
"name": "Bamboo Cutting Board Set",
"description": "Set of 3 bamboo cutting boards, naturally antimicrobial, sustainable kitchen essential",
"price": 39.99,
"category": "Home",
"inStock": False,
"rating": 4.4,
},
{
"name": "LED Desk Lamp",
"description": "Adjustable LED desk lamp with 5 brightness levels, USB charging port, modern design",
"price": 49.99,
"category": "Electronics",
"inStock": True,
"rating": 4.5,
},
{
"name": "Wireless Mouse",
"description": "Ergonomic wireless mouse with precision tracking, silent clicks, 18-month battery life",
"price": 29.99,
"category": "Electronics",
"inStock": True,
"rating": 4.4,
},
]
return products
def import_products_with_batching(
client: weaviate.WeaviateClient,
collection_name: str,
products: List[Dict[str, Any]],
batch_size: int = 5,
) -> None:
"""
Import products using client-side batching for optimal performance.
Args:
client: Weaviate client instance
collection_name: Name of the collection to import to
products: List of product dictionaries
batch_size: Number of objects per batch (default: 5)
"""
# Use the alias for importing
collection = client.collections.get("products")
print(f"\nImporting {len(products)} products with batch_size={batch_size}...")
# Use fixed_size batching for controlled import
with collection.batch.fixed_size(
batch_size=batch_size,
concurrent_requests=2, # 2 parallel requests
) as batch:
for idx, product in enumerate(products, 1):
batch.add_object(properties=product)
print(f" Queued product {idx}/{len(products)}: {product['name']}")
# Check for failed objects
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"\n⚠ Warning: {len(failed_objects)} objects failed to import")
for failed in failed_objects[:3]:
print(f" Failed: {failed}")
else:
print("✓ All products imported successfully")
# Verify import
result = collection.aggregate.over_all(total_count=True)
print(f"✓ Total objects in collection: {result.total_count}")
# ============================================================================
# QUERYING: Traditional Weaviate Client
# ============================================================================
def query_with_weaviate_client(client: weaviate.WeaviateClient, collection_name: str) -> None:
"""
Demonstrate traditional Weaviate client queries using semantic search and filters.
"""
print("\n" + "=" * 70)
print("QUERY 1: Semantic Search with Weaviate Client")
print("=" * 70)
# Query using the alias instead of the collection name
collection = client.collections.get("products")
# Query 1: Semantic search for electronics under $50
query_text = "affordable tech gadgets"
response = collection.query.near_text(
query=query_text,
limit=3,
filters=Filter.by_property("price").less_than(50),
)
print(f"Query: '{query_text}' (price < $50)")
print("-" * 70)
for obj in response.objects:
props = obj.properties
print(f" • {props['name']}")
print(f" Price: ${props['price']} | Rating: {props['rating']}")
print(f" Category: {props['category']}")
print()
# Query 2: Find highly rated products in Electronics category
print("\n" + "=" * 70)
print("QUERY 2: Semantic Search with Multiple Filters")
print("=" * 70)
query_text = "premium electronics"
response = collection.query.near_text(
query=query_text,
limit=4,
filters=Filter.all_of([
Filter.by_property("category").equal("Electronics"),
Filter.by_property("rating").greater_or_equal(4.5),
Filter.by_property("inStock").equal(True),
]),
)
print(f"Query: '{query_text}' (Electronics, rating >= 4.5, in stock)")
print("-" * 70)
for obj in response.objects:
props = obj.properties
print(f" • {props['name']}")
print(f" Price: ${props['price']} | Rating: {props['rating']}")
print()
# Query 3: Get all sports products
print("\n" + "=" * 70)
print("QUERY 3: Filter-based Query")
print("=" * 70)
response = collection.query.fetch_objects(
limit=10,
filters=Filter.by_property("category").equal("Sports"),
)
print("All Sports Category Products:")
print("-" * 70)
for obj in response.objects:
props = obj.properties
print(f" • {props['name']} - ${props['price']}")
# ============================================================================
# QUERYING: Weaviate Query Agent
# ============================================================================
def query_with_query_agent(client: weaviate.WeaviateClient) -> None:
"""
Demonstrate the Weaviate Query Agent for natural language queries.
The Query Agent automatically decides which collections to query,
what type of queries to perform (search vs aggregation), and synthesizes answers.
"""
print("\n" + "=" * 70)
print("QUERY 4: Natural Language Query with Query Agent")
print("=" * 70)
try:
from weaviate.agents.query import QueryAgent
# Initialize Query Agent with collection(s)
# Can use either the alias or the collection name
query_agent = QueryAgent(
client=client,
collections=["products"],
system_prompt="You are a helpful shopping assistant. Provide clear and concise product recommendations.",
)
# Example natural language queries
queries = [
"What are the most expensive electronic products?",
"Show me products under $50 that are highly rated",
"What sports items are currently in stock?",
]
for idx, natural_query in enumerate(queries, 1):
print(f"\nQuery {idx}: '{natural_query}'")
print("-" * 70)
try:
response = query_agent.ask(natural_query)
# Use the display method to print the response
response.display()
except Exception as e:
print(f"Error: {e}")
print()
except ImportError:
print("\nNote: QueryAgent requires weaviate-agents library")
print("Install it with: pip install weaviate-agents")
print("\nThe Query Agent would automatically handle natural language queries")
print("without needing to specify collection names or query types.")
# ============================================================================
# AGGREGATION: Advanced Querying
# ============================================================================
def demonstrate_aggregations(client: weaviate.WeaviateClient, collection_name: str) -> None:
"""
Demonstrate aggregation queries to get statistics about the products.
"""
print("\n" + "=" * 70)
print("QUERY 5: Aggregation - Product Statistics")
print("=" * 70)
# Use the alias for aggregations
collection = client.collections.get("products")
# Get count of products by category
print("\nProducts by Category:")
print("-" * 70)
response = collection.aggregate.over_all(
group_by="category",
total_count=True,
)
for group in response.groups:
print(f" • {group.grouped_by.value}: {group.total_count} products")
# Get average rating
print("\nProduct Statistics:")
print("-" * 70)
response = collection.aggregate.over_all(
total_count=True,
)
print(f" Total products: {response.total_count}")
# ============================================================================
# MAIN EXECUTION
# ============================================================================
def main():
"""Execute the complete end-to-end workflow"""
print("\n" + "=" * 70)
print("WEAVIATE END-TO-END EXAMPLE: E-Commerce Product Collection")
print("=" * 70)
# Step 1: Connect to Weaviate Cloud
print("\nStep 1: Connecting to Weaviate Cloud...")
client = connect_to_weaviate_cloud()
# Step 2: Create collection with RQ quantization
print("\nStep 2: Creating E-Commerce Collection with RQ Quantization...")
collection_name = create_ecommerce_collection(client)
# Step 3: Generate and import sample data with batching
print("\nStep 3: Generating and Importing Sample Products...")
products = generate_sample_products()
import_products_with_batching(client, collection_name, products, batch_size=5)
# Step 4: Query with traditional Weaviate client
print("\nStep 4: Querying with Traditional Weaviate Client...")
query_with_weaviate_client(client, collection_name)
# Step 5: Demonstrate aggregations
print("\nStep 5: Aggregation Queries...")
demonstrate_aggregations(client, collection_name)
# Step 6: Query with Query Agent
print("\nStep 6: Querying with Weaviate Query Agent...")
query_with_query_agent(client)
# Close connection
client.close()
print("\n" + "=" * 70)
print("✓ Example completed successfully!")
print("=" * 70 + "\n")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment