Skip to content

Instantly share code, notes, and snippets.

@amercader
Last active October 11, 2024 10:07
Show Gist options
  • Save amercader/2f0f54e1fcf33bad5b7d8b10aa2a10f8 to your computer and use it in GitHub Desktop.
Save amercader/2f0f54e1fcf33bad5b7d8b10aa2a10f8 to your computer and use it in GitHub Desktop.
CKAN Pluggable search backend prototype
# This will eventually live in ckan/logic/action/get.py
def search(context, data_dict):
# Check auth
backend = get_search_backend()
# Validate data_dict. Any key not in the standard schema is moved to
# additional_params
# Q: Do we validate just the common interface params here or get additional schema
# entries from the search feature plugin to validate additional params like `bbox`?
schema = default_search_query_schema()
for plugin in PluginImplementations(ISearchFeature):
plugin.search_schema(schema)
# Call query method of the relevant backend
query_dict = {
...
}
result = backend.search_query(**query_dict)
return result
# This could be a CLI command or an action
def index(entity_type, entity_ids)
# Check auth
# Get whatever data needs to be indexed, e.g. a validated data_dict for datasets or ISearchEntity.fetch_records()
# for custom entities
data_dict = get_search_data_for_entity()
# Call the index method of the relevant backend
backend = get_search_backend()
backend.index_search_record(entity_type, id_, data_dict)
# This will eventually live in CKAN core
class ISearchProvider:
def search_query(
self,
query: str, # e.g. 'water data'
filters: dict[
str, str | list[str]
], # e.g. {'metadata_modified<': '2024-01-01', 'entity_type': ['package']}
sort: list[list[str]], # e.g. [['title'], ['metadata_modified', 'desc']]
lang: str, # for text query language stemming e.g. 'de'
additional_params: dict[
str, Any
], # custom parameters this provider may process or ignore
return_ids: bool, # True: return records as ids (may increase maximum record limit)
return_entity_types: bool, # True: wrap records with {'entity_type': et, 'data': record} objects
return_facets: bool, # True: return facet counts for available indexes
limit: Optional[
int
], # maximum records to return, None: maximum provider allows
) -> Optional[SearchResults]:
"""generate search results or return None if another provider
should be used for the query"""
def initialize_search_provider(
self, combined_schema: SearchSchema, clear: bool
) -> None:
"""create or update indexes for fields based on combined search
schema containing all field names, types and repeating state"""
def index_search_record(
self, entity_type: str, id_: str, search_data: dict[str, str | list[str]]
) -> None:
"create or update search data record in index"
def delete_search_record(self, entity_type: str, id_: str) -> None:
"remove record from index"
class ISearchFeature:
def entity_types(self) -> list[str]:
"return list of entity types covered by this feature"
def search_schema(self) -> SearchSchema:
"""return index fields names, their types (text, str, date, numeric)
and whether they are repeating"""
def format_search_data(
self, entity_type: str, data: dict[str:Any]
) -> dict[str, str | list[str]]:
"""convert data for this entity type to search data suitable to be
passed to the search provider index method"""
def existing_record_ids(self, entity_type: str) -> Iterable[str]:
"""return a list or iterable of all record ids for the given entity type
managed by this feature. Return an empty list for core entity
types like 'package' or entity types managed by another feature.
This method is used to identify missing and orphan records in the
search index"""
def fetch_records(
self, entity_type: str, records: Optional[Iterable[str]]
) -> Iterable[dict[str, Any]]:
"""generator of all records for this entity type managed by this
feature, or only records for the ids passed if not None.
This method is used to rebuild all or some records in the search
index"""
# This would live in the ckanext-search-solr extension
class CKANSearchSolrBackend(ISearchProvider):
def search_query(selfi,
query: str,
filters: dict[str, str | list[str]],
sort: list[list[str]],
lang: str,
additional_params: dict[str, Any],
return_ids: bool,
return_entity_types: bool,
return_facets: bool,
limit: Optional[int],
):
processed = {}
for plugin in PluginImplementations(ISearchFeature):
processed.update(plugin.process_additional_params(additional_params))
# Construct actual Solr query
# Call Solr Client
# Parse results to adapt them to the common interface format
return results
def index_search_record(entity_type, id_, data_dict):
solr_record = {}
for plugin in PluginImplementations(ISearchFeature):
solr_record.update(plugin.augment_search_record(entity_type, id_, data_dict))
# the rest of solr_record populated here
# Q: how? Merging the two?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment