Skip to content

Instantly share code, notes, and snippets.

@tspannhw
Last active August 12, 2024 23:26
Show Gist options
  • Select an option

  • Save tspannhw/6abf0f2ed0c28649e79c8e91a58e5882 to your computer and use it in GitHub Desktop.

Select an option

Save tspannhw/6abf0f2ed0c28649e79c8e91a58e5882 to your computer and use it in GitHub Desktop.
from datetime import datetime
from datetime import datetime, timedelta
import numpy
import pprint
import json
import requests
import os
from pymilvus import connections
from pymilvus import utility
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection
from pymilvus import model
from pymilvus.model.dense import SentenceTransformerEmbeddingFunction
from pymilvus import MilvusClient
from dotenv import load_dotenv
load_dotenv(verbose=True)
DIMENSION = 384
MILVUS_URL = "http://192.168.1.YOURIP:19530"
MILVUS_CLOUD_URL = "https://CLOUDSTUFF.serverless.gcp-us-west1.cloud.zilliz.com"
COLLECTION_NAME = "airquality"
AQ_URL = "https://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&distance=5000&zipCode="
AQ_KEY = "&API_KEY="
API_KEY = "APIKEYFROMAQ"
TOKEN = "ZILLIZTOKENIFUSINGCLOUD"
# -----------------------------------------------------------------------------
# Connect to Milvus
# Local Docker Server
milvus_client = MilvusClient( uri=MILVUS_URL )
# Zilliz Cloud
# milvus_client = MilvusClient( uri=MILVUS_URL, token=TOKEN )
model = SentenceTransformerEmbeddingFunction('all-MiniLM-L6-v2',device='cpu' )
# https://api.openaq.org/v3/locations?order_by=id&sort_order=asc&iso=US&limit=100&page=1
# https://api.openaq.org/v3/parameters?order_by=id&sort_order=asc&limit=1000&page=1
# https://api.openaq.org/v3/countries?order_by=id&sort_order=asc&limit=1000&page=1
OAQ3_COLLECTION_NAME = "openaqmeasurements"
TODAYS_DATE = str( datetime.today().strftime('%Y-%m-%d') )
YESTERDAYS_DATE = (datetime.now() - timedelta(1)).strftime('%Y-%m-%d')
details = ""
fulllocation = ""
data = []
## schema
schema = milvus_client.create_schema(
enable_dynamic_field=False
)
schema.add_field(field_name='id', datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name='locationId', datatype=DataType.INT32)
schema.add_field(field_name='location', datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name='parameter', datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name="value", datatype=DataType.FLOAT)
schema.add_field(field_name='datelocal', datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name="unit", datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name="latitude", datatype=DataType.FLOAT)
schema.add_field(field_name="longitude", datatype=DataType.FLOAT)
schema.add_field(field_name="country", datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name="city", datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name="isMobile", datatype=DataType.VARCHAR, max_length=12)
schema.add_field(field_name="isAnalysis", datatype=DataType.VARCHAR, max_length=12)
schema.add_field(field_name='entity', datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name='sensorType', datatype=DataType.VARCHAR, max_length=255)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION)
schema.add_field(field_name="details", datatype=DataType.VARCHAR, max_length=8000)
schema.add_field(field_name="fulllocation", datatype=DataType.VARCHAR, max_length=2000)
index_params = milvus_client.prepare_index_params()
index_params.add_index(
field_name="id",
index_type="STL_SORT"
)
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT",
metric_type="L2",
params={"nlist": 100}
)
milvus_client.create_collection(
collection_name = OAQ3_COLLECTION_NAME,
schema=schema,
index_params=index_params
)
res = milvus_client.get_load_state(
collection_name = OAQ3_COLLECTION_NAME
)
print(res)
# for pages in range(101):
# page = 2, ...
url = 'https://api.openaq.org/v2/measurements?country=US&date_from={0}&date_to={1}&limit=1000&page=1&offset=0&sort=desc&radius=1000&order_by=datetime'.format(str(YESTERDAYS_DATE), str(TODAYS_DATE) )
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
# print(response.text)
openaq2 = json.loads(response.text)
for jsonitems in openaq2['results']:
fulllocation = 'Location {0}: {1}, {2}, {3} @ {4},{5}'.format(jsonitems.get('locationId'),
jsonitems.get('location'),jsonitems.get('city'),jsonitems.get('country'),
jsonitems.get('coordinates')['latitude'],jsonitems.get('coordinates')['longitude'] )
details = 'Current Air Quality Reading for {0} is {1} {2} for {3} at {4}. Is Mobile: {5} Is Analysis: {6} Entity: {7} Sensor Type: {8}'.format(
jsonitems.get('parameter'), jsonitems.get('value'),
jsonitems.get('unit'),
fulllocation, jsonitems.get('date')['local'],
jsonitems.get('isMobile'), jsonitems.get('isAnalysis'),
jsonitems.get('entity'), jsonitems.get('sensorType'))
data.append({ "locationId": int(jsonitems.get('locationId')),
"location": str(jsonitems.get('location','')),
"parameter": str(jsonitems.get('parameter','')),
"value": float(jsonitems.get('value')),
"datelocal": str(jsonitems.get('date')['local']),
"unit": str(jsonitems.get('unit','')),
"latitude": float(jsonitems.get('coordinates')['latitude']),
"longitude": float(jsonitems.get('coordinates')['longitude']),
"country": str(jsonitems.get('country','')),
"city": str(jsonitems.get('city','')),
"isMobile": str(jsonitems.get('isMobile','')),
"isAnalysis": str(jsonitems.get('isAnalysis','')),
"entity": str(jsonitems.get('entity','')),
"sensorType": str(jsonitems.get('sensorType','')),
"vector": model(details), "details": str(details), "fulllocation": str(fulllocation)})
# print(details)
# break
res = milvus_client.insert(collection_name=OAQ3_COLLECTION_NAME, data=data)
# print(res)
# print(data)
# Define search parameters
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
# Use first record as search record
query_vector = [data[0]["vector"]]
# Execute the search on the 'vector' field
search_results = milvus_client.search(
OAQ3_COLLECTION_NAME,
data=query_vector,
anns_field="vector",
output_fields=["id", "unit", "country", "latitude", "longitude", "parameter","value", "city", "details", "fulllocation"],
search_params=search_params,
limit=5
)
# Print search results
for hits in search_results:
for hit in hits:
print(f"Hit: {hit}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment