Last active
August 12, 2024 23:26
-
-
Save tspannhw/6abf0f2ed0c28649e79c8e91a58e5882 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime | |
| from datetime import datetime, timedelta | |
| import numpy | |
| import pprint | |
| import json | |
| import requests | |
| import os | |
| from pymilvus import connections | |
| from pymilvus import utility | |
| from pymilvus import FieldSchema, CollectionSchema, DataType, Collection | |
| from pymilvus import model | |
| from pymilvus.model.dense import SentenceTransformerEmbeddingFunction | |
| from pymilvus import MilvusClient | |
| from dotenv import load_dotenv | |
| load_dotenv(verbose=True) | |
| DIMENSION = 384 | |
| MILVUS_URL = "http://192.168.1.YOURIP:19530" | |
| MILVUS_CLOUD_URL = "https://CLOUDSTUFF.serverless.gcp-us-west1.cloud.zilliz.com" | |
| COLLECTION_NAME = "airquality" | |
| AQ_URL = "https://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&distance=5000&zipCode=" | |
| AQ_KEY = "&API_KEY=" | |
| API_KEY = "APIKEYFROMAQ" | |
| TOKEN = "ZILLIZTOKENIFUSINGCLOUD" | |
| # ----------------------------------------------------------------------------- | |
| # Connect to Milvus | |
| # Local Docker Server | |
| milvus_client = MilvusClient( uri=MILVUS_URL ) | |
| # Zilliz Cloud | |
| # milvus_client = MilvusClient( uri=MILVUS_URL, token=TOKEN ) | |
| model = SentenceTransformerEmbeddingFunction('all-MiniLM-L6-v2',device='cpu' ) | |
| # https://api.openaq.org/v3/locations?order_by=id&sort_order=asc&iso=US&limit=100&page=1 | |
| # https://api.openaq.org/v3/parameters?order_by=id&sort_order=asc&limit=1000&page=1 | |
| # https://api.openaq.org/v3/countries?order_by=id&sort_order=asc&limit=1000&page=1 | |
| OAQ3_COLLECTION_NAME = "openaqmeasurements" | |
| TODAYS_DATE = str( datetime.today().strftime('%Y-%m-%d') ) | |
| YESTERDAYS_DATE = (datetime.now() - timedelta(1)).strftime('%Y-%m-%d') | |
| details = "" | |
| fulllocation = "" | |
| data = [] | |
| ## schema | |
| schema = milvus_client.create_schema( | |
| enable_dynamic_field=False | |
| ) | |
| schema.add_field(field_name='id', datatype=DataType.INT64, is_primary=True, auto_id=True) | |
| schema.add_field(field_name='locationId', datatype=DataType.INT32) | |
| schema.add_field(field_name='location', datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name='parameter', datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name="value", datatype=DataType.FLOAT) | |
| schema.add_field(field_name='datelocal', datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name="unit", datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name="latitude", datatype=DataType.FLOAT) | |
| schema.add_field(field_name="longitude", datatype=DataType.FLOAT) | |
| schema.add_field(field_name="country", datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name="city", datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name="isMobile", datatype=DataType.VARCHAR, max_length=12) | |
| schema.add_field(field_name="isAnalysis", datatype=DataType.VARCHAR, max_length=12) | |
| schema.add_field(field_name='entity', datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name='sensorType', datatype=DataType.VARCHAR, max_length=255) | |
| schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION) | |
| schema.add_field(field_name="details", datatype=DataType.VARCHAR, max_length=8000) | |
| schema.add_field(field_name="fulllocation", datatype=DataType.VARCHAR, max_length=2000) | |
| index_params = milvus_client.prepare_index_params() | |
| index_params.add_index( | |
| field_name="id", | |
| index_type="STL_SORT" | |
| ) | |
| index_params.add_index( | |
| field_name="vector", | |
| index_type="IVF_FLAT", | |
| metric_type="L2", | |
| params={"nlist": 100} | |
| ) | |
| milvus_client.create_collection( | |
| collection_name = OAQ3_COLLECTION_NAME, | |
| schema=schema, | |
| index_params=index_params | |
| ) | |
| res = milvus_client.get_load_state( | |
| collection_name = OAQ3_COLLECTION_NAME | |
| ) | |
| print(res) | |
| # for pages in range(101): | |
| # page = 2, ... | |
| url = 'https://api.openaq.org/v2/measurements?country=US&date_from={0}&date_to={1}&limit=1000&page=1&offset=0&sort=desc&radius=1000&order_by=datetime'.format(str(YESTERDAYS_DATE), str(TODAYS_DATE) ) | |
| headers = {"accept": "application/json"} | |
| response = requests.get(url, headers=headers) | |
| # print(response.text) | |
| openaq2 = json.loads(response.text) | |
| for jsonitems in openaq2['results']: | |
| fulllocation = 'Location {0}: {1}, {2}, {3} @ {4},{5}'.format(jsonitems.get('locationId'), | |
| jsonitems.get('location'),jsonitems.get('city'),jsonitems.get('country'), | |
| jsonitems.get('coordinates')['latitude'],jsonitems.get('coordinates')['longitude'] ) | |
| details = 'Current Air Quality Reading for {0} is {1} {2} for {3} at {4}. Is Mobile: {5} Is Analysis: {6} Entity: {7} Sensor Type: {8}'.format( | |
| jsonitems.get('parameter'), jsonitems.get('value'), | |
| jsonitems.get('unit'), | |
| fulllocation, jsonitems.get('date')['local'], | |
| jsonitems.get('isMobile'), jsonitems.get('isAnalysis'), | |
| jsonitems.get('entity'), jsonitems.get('sensorType')) | |
| data.append({ "locationId": int(jsonitems.get('locationId')), | |
| "location": str(jsonitems.get('location','')), | |
| "parameter": str(jsonitems.get('parameter','')), | |
| "value": float(jsonitems.get('value')), | |
| "datelocal": str(jsonitems.get('date')['local']), | |
| "unit": str(jsonitems.get('unit','')), | |
| "latitude": float(jsonitems.get('coordinates')['latitude']), | |
| "longitude": float(jsonitems.get('coordinates')['longitude']), | |
| "country": str(jsonitems.get('country','')), | |
| "city": str(jsonitems.get('city','')), | |
| "isMobile": str(jsonitems.get('isMobile','')), | |
| "isAnalysis": str(jsonitems.get('isAnalysis','')), | |
| "entity": str(jsonitems.get('entity','')), | |
| "sensorType": str(jsonitems.get('sensorType','')), | |
| "vector": model(details), "details": str(details), "fulllocation": str(fulllocation)}) | |
| # print(details) | |
| # break | |
| res = milvus_client.insert(collection_name=OAQ3_COLLECTION_NAME, data=data) | |
| # print(res) | |
| # print(data) | |
| # Define search parameters | |
| search_params = { | |
| "metric_type": "L2", | |
| "params": {"nprobe": 10} | |
| } | |
| # Use first record as search record | |
| query_vector = [data[0]["vector"]] | |
| # Execute the search on the 'vector' field | |
| search_results = milvus_client.search( | |
| OAQ3_COLLECTION_NAME, | |
| data=query_vector, | |
| anns_field="vector", | |
| output_fields=["id", "unit", "country", "latitude", "longitude", "parameter","value", "city", "details", "fulllocation"], | |
| search_params=search_params, | |
| limit=5 | |
| ) | |
| # Print search results | |
| for hits in search_results: | |
| for hit in hits: | |
| print(f"Hit: {hit}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment