Created
March 20, 2018 20:14
-
-
Save ognjenm/7a2c857d484a3a054c2d275b822e7799 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Copyright 2013-2018 Erudika. https://erudika.com | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| * | |
| * For issues and patches go to: https://github.com/erudika | |
| */ | |
| package com.erudika.para.search; | |
| import com.erudika.para.AppCreatedListener; | |
| import com.erudika.para.AppDeletedListener; | |
| import com.erudika.para.core.Address; | |
| import com.erudika.para.core.App; | |
| import com.erudika.para.core.ParaObject; | |
| import com.erudika.para.core.Tag; | |
| import com.erudika.para.core.utils.CoreUtils; | |
| import com.erudika.para.persistence.DAO; | |
| import static com.erudika.para.search.ElasticSearchUtils.PROPS_PREFIX; | |
| import static com.erudika.para.search.ElasticSearchUtils.PROPS_REGEX; | |
| import static com.erudika.para.search.ElasticSearchUtils.convertQueryStringToNestedQuery; | |
| import static com.erudika.para.search.ElasticSearchUtils.getIndexName; | |
| import static com.erudika.para.search.ElasticSearchUtils.getNestedKey; | |
| import static com.erudika.para.search.ElasticSearchUtils.getPager; | |
| import static com.erudika.para.search.ElasticSearchUtils.getTermsQuery; | |
| import static com.erudika.para.search.ElasticSearchUtils.getType; | |
| import static com.erudika.para.search.ElasticSearchUtils.getValueFieldName; | |
| import static com.erudika.para.search.ElasticSearchUtils.isAsyncEnabled; | |
| import static com.erudika.para.search.ElasticSearchUtils.keyValueBoolQuery; | |
| import static com.erudika.para.search.ElasticSearchUtils.nestedMode; | |
| import static com.erudika.para.search.ElasticSearchUtils.nestedPropsQuery; | |
| import static com.erudika.para.search.ElasticSearchUtils.qs; | |
| import com.erudika.para.utils.Config; | |
| import com.erudika.para.utils.Pager; | |
| import com.erudika.para.utils.Utils; | |
| import java.util.ArrayList; | |
| import java.util.Arrays; | |
| import java.util.Collections; | |
| import java.util.HashMap; | |
| import java.util.LinkedList; | |
| import java.util.List; | |
| import java.util.Map; | |
| import javax.inject.Inject; | |
| import javax.inject.Singleton; | |
| import org.apache.commons.lang3.StringUtils; | |
| import org.apache.commons.lang3.math.NumberUtils; | |
| import static org.apache.lucene.search.join.ScoreMode.Avg; | |
| import org.elasticsearch.action.ActionListener; | |
| import org.elasticsearch.action.bulk.BulkRequestBuilder; | |
| import org.elasticsearch.action.bulk.BulkResponse; | |
| import org.elasticsearch.action.delete.DeleteRequest; | |
| import org.elasticsearch.action.delete.DeleteRequestBuilder; | |
| import org.elasticsearch.action.delete.DeleteResponse; | |
| import org.elasticsearch.action.get.GetRequestBuilder; | |
| import org.elasticsearch.action.get.GetResponse; | |
| import org.elasticsearch.action.index.IndexRequestBuilder; | |
| import org.elasticsearch.action.index.IndexResponse; | |
| import org.elasticsearch.action.search.SearchRequestBuilder; | |
| import org.elasticsearch.action.search.SearchResponse; | |
| import org.elasticsearch.action.search.SearchType; | |
| import org.elasticsearch.client.Client; | |
| import org.elasticsearch.common.unit.DistanceUnit; | |
| import org.elasticsearch.common.unit.TimeValue; | |
| import org.elasticsearch.index.IndexNotFoundException; | |
| import org.elasticsearch.index.query.BoolQueryBuilder; | |
| import org.elasticsearch.index.query.MoreLikeThisQueryBuilder.Item; | |
| import org.elasticsearch.index.query.QueryBuilder; | |
| import static org.elasticsearch.index.query.QueryBuilders.boolQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.geoDistanceQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.idsQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.matchQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.moreLikeThisQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.prefixQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.termQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.termsQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.wildcardQuery; | |
| import org.elasticsearch.search.SearchHit; | |
| import org.elasticsearch.search.SearchHits; | |
| import org.elasticsearch.search.sort.SortBuilder; | |
| import org.elasticsearch.search.sort.SortBuilders; | |
| import org.elasticsearch.search.sort.SortOrder; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| /** | |
| * An implementation of the {@link Search} interface using ElasticSearch. | |
| * @author Alex Bogdanovski [alex@erudika.com] | |
| */ | |
| @Singleton | |
| public class ElasticSearch implements Search { | |
| private static final Logger logger = LoggerFactory.getLogger(ElasticSearch.class); | |
| private DAO dao; | |
| /** | |
| * No-args constructor. | |
| */ | |
| public ElasticSearch() { | |
| if (Config.isSearchEnabled()) { | |
| ElasticSearchUtils.getClient(); | |
| } | |
| // set up automatic index creation and deletion | |
| App.addAppCreatedListener(new AppCreatedListener() { | |
| public void onAppCreated(App app) { | |
| if (app != null) { | |
| String appid = app.getAppIdentifier(); | |
| if (app.isSharingIndex()) { | |
| ElasticSearchUtils.addIndexAliasWithRouting(Config.getRootAppIdentifier(), appid); | |
| } else { | |
| int shards = app.isRootApp() ? Config.getConfigInt("es.shards", 5) : | |
| Config.getConfigInt("es.shards_for_child_apps", 2); | |
| int replicas = app.isRootApp() ? Config.getConfigInt("es.replicas", 0) : | |
| Config.getConfigInt("es.replicas_for_child_apps", 0); | |
| ElasticSearchUtils.createIndex(appid, shards, replicas); | |
| } | |
| } | |
| } | |
| }); | |
| App.addAppDeletedListener(new AppDeletedListener() { | |
| public void onAppDeleted(App app) { | |
| if (app != null) { | |
| String appid = app.getAppIdentifier(); | |
| if (app.isSharingIndex()) { | |
| CoreUtils.getInstance().getSearch().unindexAll(appid, null, true); | |
| ElasticSearchUtils.removeIndexAlias(Config.getRootAppIdentifier(), appid); | |
| } else { | |
| ElasticSearchUtils.deleteIndex(appid); | |
| } | |
| } | |
| } | |
| }); | |
| } | |
| /** | |
| * Default constructor. | |
| * @param dao an instance of the persistence class | |
| */ | |
| @Inject | |
| public ElasticSearch(DAO dao) { | |
| this.dao = dao; | |
| } | |
| private DAO getDAO() { | |
| if (dao == null) { | |
| return CoreUtils.getInstance().getDao(); | |
| } | |
| return dao; | |
| } | |
| Client client() { | |
| return ElasticSearchUtils.getClient(); | |
| } | |
| @Override | |
| public void index(String appid, ParaObject po) { | |
| if (po == null || StringUtils.isBlank(appid)) { | |
| return; | |
| } | |
| try { | |
| IndexRequestBuilder irb = client().prepareIndex(getIndexName(appid), getType(), po.getId()). | |
| setSource(ElasticSearchUtils.getSourceFromParaObject(po)); | |
| ActionListener<IndexResponse> responseHandler = ElasticSearchUtils.getIndexResponseHandler(); | |
| if (isAsyncEnabled()) { | |
| irb.execute(responseHandler); | |
| } else { | |
| responseHandler.onResponse(irb.execute().actionGet()); | |
| } | |
| logger.debug("Search.index() {}", po.getId()); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| } | |
| } | |
| @Override | |
| public void unindex(String appid, ParaObject po) { | |
| if (po == null || StringUtils.isBlank(po.getId()) || StringUtils.isBlank(appid)) { | |
| return; | |
| } | |
| try { | |
| DeleteRequestBuilder drb = client().prepareDelete(getIndexName(appid), getType(), po.getId()); | |
| ActionListener<DeleteResponse> responseHandler = ElasticSearchUtils.getIndexResponseHandler(); | |
| if (isAsyncEnabled()) { | |
| drb.execute(responseHandler); | |
| } else { | |
| responseHandler.onResponse(drb.execute().actionGet()); | |
| } | |
| logger.debug("Search.unindex() {}", po.getId()); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| } | |
| } | |
| @Override | |
| public <P extends ParaObject> void indexAll(String appid, List<P> objects) { | |
| if (StringUtils.isBlank(appid) || objects == null || objects.isEmpty()) { | |
| return; | |
| } | |
| BulkRequestBuilder brb = client().prepareBulk(); | |
| for (ParaObject po : objects) { | |
| brb.add(client().prepareIndex(getIndexName(appid), getType(), po.getId()). | |
| setSource(ElasticSearchUtils.getSourceFromParaObject(po))); | |
| } | |
| if (brb.numberOfActions() > 0) { | |
| ActionListener<BulkResponse> responseHandler = ElasticSearchUtils.getBulkIndexResponseHandler(); | |
| if (isAsyncEnabled()) { | |
| brb.execute(responseHandler); | |
| } else { | |
| responseHandler.onResponse(brb.execute().actionGet()); | |
| } | |
| } | |
| logger.debug("Search.indexAll() {}", objects.size()); | |
| } | |
| @Override | |
| public <P extends ParaObject> void unindexAll(String appid, List<P> objects) { | |
| if (StringUtils.isBlank(appid) || objects == null || objects.isEmpty()) { | |
| return; | |
| } | |
| BulkRequestBuilder brb = client().prepareBulk(); | |
| for (ParaObject po : objects) { | |
| brb.add(client().prepareDelete(getIndexName(appid), getType(), po.getId())); | |
| } | |
| if (brb.numberOfActions() > 0) { | |
| ActionListener<BulkResponse> responseHandler = ElasticSearchUtils.getBulkIndexResponseHandler(); | |
| if (isAsyncEnabled()) { | |
| brb.execute(responseHandler); | |
| } else { | |
| responseHandler.onResponse(brb.execute().actionGet()); | |
| } | |
| } | |
| logger.debug("Search.unindexAll() {}", objects.size()); | |
| } | |
| @Override | |
| public void unindexAll(String appid, Map<String, ?> terms, boolean matchAll) { | |
| if (StringUtils.isBlank(appid)) { | |
| return; | |
| } | |
| QueryBuilder fb = (terms == null || terms.isEmpty()) ? | |
| matchAllQuery() : getTermsQuery(terms, matchAll); | |
| SearchResponse scrollResp = client().prepareSearch(getIndexName(appid)) | |
| .setScroll(new TimeValue(60000)) | |
| .setQuery(fb) | |
| .setSize(100).execute().actionGet(); | |
| BulkRequestBuilder brb = client().prepareBulk(); | |
| while (true) { | |
| for (SearchHit hit : scrollResp.getHits()) { | |
| brb.add(new DeleteRequest(getIndexName(appid), getType(), hit.getId())); | |
| } | |
| // next page | |
| scrollResp = client().prepareSearchScroll(scrollResp.getScrollId()). | |
| setScroll(new TimeValue(600000)).execute().actionGet(); | |
| if (scrollResp.getHits().getHits().length == 0) { | |
| break; | |
| } | |
| } | |
| if (brb.numberOfActions() > 0) { | |
| BulkResponse result = brb.execute().actionGet(); | |
| if (result.hasFailures()) { | |
| logger.warn("Unindexed {} documents with failures ({}), took {}s.", brb.numberOfActions(), | |
| result.buildFailureMessage(), result.getTook().seconds()); | |
| } else { | |
| logger.info("Unindexed {} documents without failures, took {}s.", | |
| brb.numberOfActions(), result.getTook().seconds()); | |
| } | |
| } | |
| } | |
| @Override | |
| public <P extends ParaObject> P findById(String appid, String id) { | |
| try { | |
| return ElasticSearchUtils.getParaObjectFromSource(getSource(appid, id)); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| return null; | |
| } | |
| } | |
| @Override | |
| @SuppressWarnings("unchecked") | |
| public <P extends ParaObject> List<P> findByIds(String appid, List<String> ids) { | |
| List<P> list = new LinkedList<P>(); | |
| if (ids == null || ids.isEmpty()) { | |
| return list; | |
| } | |
| try { | |
| QueryBuilder qb = termsQuery(Config._ID, ids); | |
| return searchQuery(appid, null, qb); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| } | |
| return list; | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTermInList(String appid, String type, | |
| String field, List<?> terms, Pager... pager) { | |
| if (StringUtils.isBlank(field) || terms == null) { | |
| return Collections.emptyList(); | |
| } | |
| QueryBuilder qb; | |
| if (nestedMode() && field.startsWith(PROPS_PREFIX)) { | |
| QueryBuilder bfb = null; | |
| BoolQueryBuilder fb = boolQuery(); | |
| for (Object term : terms) { | |
| bfb = keyValueBoolQuery(field, String.valueOf(term)); | |
| fb.should(bfb); | |
| } | |
| qb = nestedPropsQuery(terms.size() > 1 ? fb : bfb); | |
| } else { | |
| qb = termsQuery(field, terms); | |
| } | |
| return searchQuery(appid, type, qb, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findPrefix(String appid, String type, | |
| String field, String prefix, Pager... pager) { | |
| if (StringUtils.isBlank(field) || StringUtils.isBlank(prefix)) { | |
| return Collections.emptyList(); | |
| } | |
| QueryBuilder qb; | |
| if (nestedMode() && field.startsWith(PROPS_PREFIX)) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(field, prefixQuery(getValueFieldName(prefix), prefix))); | |
| } else { | |
| qb = prefixQuery(field, prefix); | |
| } | |
| return searchQuery(appid, type, qb, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findQuery(String appid, String type, | |
| String query, Pager... pager) { | |
| if (StringUtils.isBlank(query)) { | |
| return Collections.emptyList(); | |
| } | |
| // a basic implementation of support for nested queries in query string | |
| // https://github.com/elastic/elasticsearch/issues/11322 | |
| QueryBuilder qb; | |
| if (nestedMode() && query.matches(PROPS_REGEX)) { | |
| qb = convertQueryStringToNestedQuery(query); | |
| if (qb == null) { | |
| return Collections.emptyList(); | |
| } | |
| } else { | |
| qb = queryStringQuery(qs(query)).allowLeadingWildcard(false); | |
| } | |
| return searchQuery(appid, type, qb, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findNestedQuery(String appid, String type, String field, | |
| String query, Pager... pager) { | |
| if (StringUtils.isBlank(query) || StringUtils.isBlank(field)) { | |
| return Collections.emptyList(); | |
| } | |
| String queryString = "nstd." + field + ":" + query; | |
| QueryBuilder qb = nestedQuery("nstd", queryStringQuery(qs(queryString)), Avg); | |
| return searchQuery(appid, type, qb, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findWildcard(String appid, String type, | |
| String field, String wildcard, Pager... pager) { | |
| if (StringUtils.isBlank(field) || StringUtils.isBlank(wildcard)) { | |
| return Collections.emptyList(); | |
| } | |
| QueryBuilder qb; | |
| if (nestedMode() && field.startsWith(PROPS_PREFIX)) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(field, wildcardQuery(getValueFieldName(wildcard), wildcard))); | |
| } else { | |
| qb = wildcardQuery(field, wildcard); | |
| } | |
| return searchQuery(appid, type, qb, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTagged(String appid, String type, | |
| String[] tags, Pager... pager) { | |
| if (tags == null || tags.length == 0 || StringUtils.isBlank(appid)) { | |
| return Collections.emptyList(); | |
| } | |
| BoolQueryBuilder tagFilter = boolQuery(); | |
| //assuming clean & safe tags here | |
| for (String tag : tags) { | |
| tagFilter.must(termQuery(Config._TAGS, tag)); | |
| } | |
| // The filter looks like this: ("tag1" OR "tag2" OR "tag3") AND "type" | |
| return searchQuery(appid, type, tagFilter, pager); | |
| } | |
| @Override | |
| @SuppressWarnings("unchecked") | |
| public <P extends ParaObject> List<P> findTerms(String appid, String type, | |
| Map<String, ?> terms, boolean mustMatchAll, Pager... pager) { | |
| if (terms == null || terms.isEmpty()) { | |
| return Collections.emptyList(); | |
| } | |
| QueryBuilder fb = getTermsQuery(terms, mustMatchAll); | |
| if (fb == null) { | |
| return Collections.emptyList(); | |
| } else { | |
| return searchQuery(appid, type, fb, pager); | |
| } | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findSimilar(String appid, String type, String filterKey, | |
| String[] fields, String liketext, Pager... pager) { | |
| if (StringUtils.isBlank(liketext)) { | |
| return Collections.emptyList(); | |
| } | |
| QueryBuilder qb; | |
| if (fields == null || fields.length == 0) { | |
| qb = moreLikeThisQuery(new String[]{liketext}).minDocFreq(1).minTermFreq(1); | |
| } else { | |
| boolean containsNestedProps = Arrays.stream(fields).anyMatch((f) -> StringUtils.startsWith(f, PROPS_PREFIX)); | |
| if (nestedMode() && containsNestedProps) { | |
| BoolQueryBuilder bqb = boolQuery(); | |
| for (String field : fields) { | |
| QueryBuilder kQuery = matchQuery(PROPS_PREFIX + "k", getNestedKey(field)); | |
| QueryBuilder vQuery = moreLikeThisQuery(new String[]{PROPS_PREFIX + "v"}, | |
| new String[]{liketext}, Item.EMPTY_ARRAY).minDocFreq(1).minTermFreq(1); | |
| bqb.should(nestedPropsQuery(boolQuery().must(kQuery).must(vQuery))); | |
| } | |
| qb = bqb; | |
| } else { | |
| qb = moreLikeThisQuery(fields, new String[]{liketext}, Item.EMPTY_ARRAY). | |
| minDocFreq(1).minTermFreq(1); | |
| } | |
| } | |
| if (!StringUtils.isBlank(filterKey)) { | |
| qb = boolQuery().mustNot(termQuery(Config._ID, filterKey)).filter(qb); | |
| } | |
| return searchQuery(appid, searchQueryRaw(appid, type, qb, pager)); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTags(String appid, String keyword, Pager... pager) { | |
| if (StringUtils.isBlank(keyword)) { | |
| return Collections.emptyList(); | |
| } | |
| QueryBuilder qb = wildcardQuery("tag", keyword.concat("*")); | |
| return searchQuery(appid, Utils.type(Tag.class), qb, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findNearby(String appid, String type, | |
| String query, int radius, double lat, double lng, Pager... pager) { | |
| if (StringUtils.isBlank(type) || StringUtils.isBlank(appid)) { | |
| return Collections.emptyList(); | |
| } | |
| if (StringUtils.isBlank(query)) { | |
| query = "*"; | |
| } | |
| // find nearby Address objects | |
| Pager page = getPager(pager); | |
| QueryBuilder qb1 = geoDistanceQuery("latlng").point(lat, lng).distance(radius, DistanceUnit.KILOMETERS); | |
| SearchHits hits1 = searchQueryRaw(appid, Utils.type(Address.class), qb1, page); | |
| page.setLastKey(null); // will cause problems if not cleared | |
| if (hits1 == null) { | |
| return Collections.emptyList(); | |
| } | |
| if (type.equals(Utils.type(Address.class))) { | |
| return searchQuery(appid, hits1); | |
| } | |
| // then find their parent objects | |
| String[] parentids = new String[hits1.getHits().length]; | |
| for (int i = 0; i < hits1.getHits().length; i++) { | |
| Object pid = hits1.getAt(i).getSourceAsMap().get(Config._PARENTID); | |
| if (pid != null) { | |
| parentids[i] = (String) pid; | |
| } | |
| } | |
| QueryBuilder qb2 = boolQuery().must(queryStringQuery(qs(query))).filter(idsQuery().addIds(parentids)); | |
| SearchHits hits2 = searchQueryRaw(appid, type, qb2, page); | |
| return searchQuery(appid, hits2); | |
| } | |
| private <P extends ParaObject> List<P> searchQuery(String appid, String type, | |
| QueryBuilder query, Pager... pager) { | |
| return searchQuery(appid, searchQueryRaw(appid, type, query, pager)); | |
| } | |
| /** | |
| * Processes the results of searchQueryRaw() and fetches the results from the data store (can be disabled). | |
| * @param <P> type of object | |
| * @param appid name of the {@link com.erudika.para.core.App} | |
| * @param hits the search results from a query | |
| * @return the list of object found | |
| */ | |
| protected <P extends ParaObject> List<P> searchQuery(final String appid, SearchHits hits) { | |
| if (hits == null) { | |
| return Collections.emptyList(); | |
| } | |
| List<P> results = new ArrayList<P>(hits.getHits().length); | |
| List<String> keys = new LinkedList<String>(); | |
| boolean readFromIndex = Config.getConfigBoolean("read_from_index", Config.ENVIRONMENT.equals("embedded")); | |
| try { | |
| for (SearchHit hit : hits) { | |
| if (readFromIndex) { | |
| P pobj = ElasticSearchUtils.getParaObjectFromSource(hit.getSourceAsMap()); | |
| results.add(pobj); | |
| } else { | |
| keys.add(hit.getId()); | |
| } | |
| logger.debug("Search result: appid={}, {}->{}", appid, hit.getSourceAsMap().get(Config._APPID), hit.getId()); | |
| } | |
| if (!readFromIndex && !keys.isEmpty()) { | |
| List<String> objectsMissingFromDB = new ArrayList<String>(results.size()); | |
| Map<String, P> fromDB = getDAO().readAll(appid, keys, true); | |
| for (int i = 0; i < keys.size(); i++) { | |
| String key = keys.get(i); | |
| P pobj = fromDB.get(key); | |
| if (pobj == null) { | |
| pobj = ElasticSearchUtils.getParaObjectFromSource(hits.getAt(i).getSourceAsMap()); | |
| // show warning that object is still in index but not in DB | |
| if (pobj != null && appid.equals(pobj.getAppid()) && pobj.getStored()) { | |
| objectsMissingFromDB.add(key); | |
| } | |
| } | |
| if (pobj != null) { | |
| results.add(pobj); | |
| } | |
| } | |
| if (!objectsMissingFromDB.isEmpty()) { | |
| logger.warn("Found {} objects in app '{}' that are indexed but not in the database: {}", | |
| objectsMissingFromDB.size(), appid, objectsMissingFromDB); | |
| } | |
| } | |
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| String msg = cause != null ? cause.getMessage() : e.getMessage(); | |
| logger.warn("Search query failed for app '{}': {}", appid, msg); | |
| } | |
| return results; | |
| } | |
| /** | |
| * Executes an ElasticSearch query. This is the core method of the class. | |
| * @param appid name of the {@link com.erudika.para.core.App} | |
| * @param type type of object | |
| * @param query the search query builder | |
| * @param pager a {@link com.erudika.para.utils.Pager} | |
| * @return a list of search results | |
| */ | |
| protected SearchHits searchQueryRaw(String appid, String type, QueryBuilder query, Pager... pager) { | |
| if (StringUtils.isBlank(appid)) { | |
| return null; | |
| } | |
| Pager page = ElasticSearchUtils.getPager(pager); | |
| SortOrder order = page.isDesc() ? SortOrder.DESC : SortOrder.ASC; | |
| int max = page.getLimit(); | |
| int pageNum = (int) page.getPage(); | |
| int start = (pageNum < 1 || pageNum > Config.MAX_PAGES) ? 0 : (pageNum - 1) * max; | |
| if (query == null) { | |
| query = matchAllQuery(); | |
| } | |
| if (!StringUtils.isBlank(type)) { | |
| query = boolQuery().must(query).must(termQuery(Config._TYPE, type)); | |
| } | |
| SearchHits hits = null; | |
| try { | |
| SearchRequestBuilder srb = client().prepareSearch(getIndexName(appid)). | |
| setSearchType(SearchType.DFS_QUERY_THEN_FETCH). | |
| setQuery(query). | |
| setSize(max); | |
| if (pageNum <= 1 && !StringUtils.isBlank(page.getLastKey())) { | |
| srb.searchAfter(new Object[]{NumberUtils.toLong(page.getLastKey())}); | |
| srb.setFrom(0); | |
| srb.addSort(SortBuilders.fieldSort("_docid").order(order)); | |
| } else { | |
| srb.setFrom(start); | |
| for (SortBuilder<?> sortField : ElasticSearchUtils.getSortFieldsFromPager(page)) { | |
| srb.addSort(sortField); | |
| } | |
| } | |
| logger.debug("Elasticsearch query: {}", srb.toString()); | |
| hits = srb.execute().actionGet().getHits(); | |
| page.setCount(hits.getTotalHits()); | |
| if (hits.getHits().length > 0) { | |
| Object id = hits.getAt(hits.getHits().length - 1).getSourceAsMap().get("_docid"); | |
| if (id != null) { | |
| page.setLastKey(id.toString()); | |
| } | |
| } | |
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| String msg = cause != null ? cause.getMessage() : e.getMessage(); | |
| logger.warn("No search results for type '{}' in app '{}': {}.", type, appid, msg); | |
| } | |
| return hits; | |
| } | |
| /** | |
| * Returns the source (a map of fields and values) for and object. | |
| * The source is extracted from the index directly not the data store. | |
| * @param appid name of the {@link com.erudika.para.core.App} | |
| * @param key the object id | |
| * @return a map representation of the object | |
| */ | |
| protected Map<String, Object> getSource(String appid, String key) { | |
| Map<String, Object> map = new HashMap<String, Object>(); | |
| if (StringUtils.isBlank(key) || StringUtils.isBlank(appid)) { | |
| return map; | |
| } | |
| try { | |
| GetRequestBuilder grb = client().prepareGet().setIndex(getIndexName(appid)).setId(key); | |
| GetResponse gres = grb.execute().actionGet(); | |
| if (gres.isExists()) { | |
| map = gres.getSource(); | |
| } | |
| } catch (IndexNotFoundException ex) { | |
| logger.warn("Index not created yet. Call '_setup' first."); | |
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| String msg = cause != null ? cause.getMessage() : e.getMessage(); | |
| logger.warn("Could not get any data from index '{}': {}", appid, msg); | |
| } | |
| return map; | |
| } | |
| @Override | |
| public Long getCount(String appid, String type) { | |
| if (StringUtils.isBlank(appid)) { | |
| return 0L; | |
| } | |
| QueryBuilder query; | |
| if (!StringUtils.isBlank(type)) { | |
| query = termQuery(Config._TYPE, type); | |
| } else { | |
| query = matchAllQuery(); | |
| } | |
| Long count = 0L; | |
| try { | |
| SearchRequestBuilder crb = client().prepareSearch(getIndexName(appid)).setSize(0).setQuery(query); | |
| count = crb.execute().actionGet().getHits().getTotalHits(); | |
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| String msg = cause != null ? cause.getMessage() : e.getMessage(); | |
| logger.warn("Could not count results in index '{}': {}", appid, msg); | |
| } | |
| return count; | |
| } | |
| @Override | |
| public Long getCount(String appid, String type, Map<String, ?> terms) { | |
| if (StringUtils.isBlank(appid) || terms == null || terms.isEmpty()) { | |
| return 0L; | |
| } | |
| Long count = 0L; | |
| QueryBuilder query = getTermsQuery(terms, true); | |
| if (query != null) { | |
| if (!StringUtils.isBlank(type)) { | |
| query = boolQuery().must(query).must(termQuery(Config._TYPE, type)); | |
| } | |
| try { | |
| SearchRequestBuilder crb = client().prepareSearch(getIndexName(appid)).setSize(0).setQuery(query); | |
| count = crb.execute().actionGet().getHits().getTotalHits(); | |
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| String msg = cause != null ? cause.getMessage() : e.getMessage(); | |
| logger.warn("Could not count results in index '{}': {}", appid, msg); | |
| } | |
| } | |
| return count; | |
| } | |
| ////////////////////////////////////////////////////////////// | |
| @Override | |
| public void index(ParaObject so) { | |
| index(Config.getRootAppIdentifier(), so); | |
| } | |
| @Override | |
| public void unindex(ParaObject so) { | |
| unindex(Config.getRootAppIdentifier(), so); | |
| } | |
| @Override | |
| public <P extends ParaObject> void indexAll(List<P> objects) { | |
| indexAll(Config.getRootAppIdentifier(), objects); | |
| } | |
| @Override | |
| public <P extends ParaObject> void unindexAll(List<P> objects) { | |
| unindexAll(Config.getRootAppIdentifier(), objects); | |
| } | |
| @Override | |
| public void unindexAll(Map<String, ?> terms, boolean matchAll) { | |
| unindexAll(Config.getRootAppIdentifier(), terms, matchAll); | |
| } | |
| @Override | |
| public <P extends ParaObject> P findById(String id) { | |
| return findById(Config.getRootAppIdentifier(), id); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findByIds(List<String> ids) { | |
| return findByIds(Config.getRootAppIdentifier(), ids); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findNearby(String type, | |
| String query, int radius, double lat, double lng, Pager... pager) { | |
| return findNearby(Config.getRootAppIdentifier(), type, query, radius, lat, lng, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findPrefix(String type, String field, String prefix, Pager... pager) { | |
| return findPrefix(Config.getRootAppIdentifier(), type, field, prefix, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findQuery(String type, String query, Pager... pager) { | |
| return findQuery(Config.getRootAppIdentifier(), type, query, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findNestedQuery(String type, String field, String query, Pager... pager) { | |
| return findNestedQuery(Config.getRootAppIdentifier(), type, field, query, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findSimilar(String type, String filterKey, String[] fields, | |
| String liketext, Pager... pager) { | |
| return findSimilar(Config.getRootAppIdentifier(), type, filterKey, fields, liketext, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTagged(String type, String[] tags, Pager... pager) { | |
| return findTagged(Config.getRootAppIdentifier(), type, tags, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTags(String keyword, Pager... pager) { | |
| return findTags(Config.getRootAppIdentifier(), keyword, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTermInList(String type, String field, | |
| List<?> terms, Pager... pager) { | |
| return findTermInList(Config.getRootAppIdentifier(), type, field, terms, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findTerms(String type, Map<String, ?> terms, | |
| boolean mustMatchBoth, Pager... pager) { | |
| return findTerms(Config.getRootAppIdentifier(), type, terms, mustMatchBoth, pager); | |
| } | |
| @Override | |
| public <P extends ParaObject> List<P> findWildcard(String type, String field, String wildcard, | |
| Pager... pager) { | |
| return findWildcard(Config.getRootAppIdentifier(), type, field, wildcard, pager); | |
| } | |
| @Override | |
| public Long getCount(String type) { | |
| return getCount(Config.getRootAppIdentifier(), type); | |
| } | |
| @Override | |
| public Long getCount(String type, Map<String, ?> terms) { | |
| return getCount(Config.getRootAppIdentifier(), type, terms); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Copyright 2013-2018 Erudika. https://erudika.com | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| * | |
| * For issues and patches go to: https://github.com/erudika | |
| */ | |
| package com.erudika.para.search; | |
| import com.erudika.para.core.App; | |
| import com.erudika.para.core.ParaObject; | |
| import com.erudika.para.core.Sysprop; | |
| import com.erudika.para.core.utils.ParaObjectUtils; | |
| import com.erudika.para.persistence.DAO; | |
| import com.erudika.para.utils.Config; | |
| import com.erudika.para.utils.Pager; | |
| import com.erudika.para.utils.Utils; | |
| import java.net.InetAddress; | |
| import java.net.UnknownHostException; | |
| import java.util.ArrayList; | |
| import java.util.Collections; | |
| import java.util.HashMap; | |
| import java.util.LinkedList; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Map.Entry; | |
| import java.util.regex.Matcher; | |
| import java.util.regex.Pattern; | |
| import org.apache.commons.lang3.StringUtils; | |
| import org.apache.commons.lang3.math.NumberUtils; | |
| import org.apache.lucene.index.Term; | |
| import org.apache.lucene.queryparser.flexible.standard.StandardQueryParser; | |
| import org.apache.lucene.search.BooleanClause; | |
| import org.apache.lucene.search.BooleanQuery; | |
| import org.apache.lucene.search.BoostQuery; | |
| import org.apache.lucene.search.FuzzyQuery; | |
| import org.apache.lucene.search.PrefixQuery; | |
| import org.apache.lucene.search.Query; | |
| import org.apache.lucene.search.TermQuery; | |
| import org.apache.lucene.search.TermRangeQuery; | |
| import org.apache.lucene.search.WildcardQuery; | |
| import static org.apache.lucene.search.join.ScoreMode.Avg; | |
| import org.elasticsearch.action.ActionListener; | |
| import org.elasticsearch.action.DocWriteResponse; | |
| import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; | |
| import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; | |
| import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; | |
| import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; | |
| import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; | |
| import org.elasticsearch.action.admin.indices.get.GetIndexResponse; | |
| import org.elasticsearch.action.bulk.BulkRequestBuilder; | |
| import org.elasticsearch.action.bulk.BulkResponse; | |
| import org.elasticsearch.client.Client; | |
| import org.elasticsearch.client.transport.TransportClient; | |
| import org.elasticsearch.cluster.health.ClusterHealthStatus; | |
| import org.elasticsearch.common.settings.Settings; | |
| import org.elasticsearch.common.transport.TransportAddress; | |
| import org.elasticsearch.common.xcontent.XContentType; | |
| import org.elasticsearch.index.query.BoolQueryBuilder; | |
| import org.elasticsearch.index.query.NestedQueryBuilder; | |
| import org.elasticsearch.index.query.QueryBuilder; | |
| import static org.elasticsearch.index.query.QueryBuilders.boolQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.fuzzyQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.matchQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.multiMatchQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.prefixQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.termQuery; | |
| import static org.elasticsearch.index.query.QueryBuilders.wildcardQuery; | |
| import org.elasticsearch.index.query.RangeQueryBuilder; | |
| import org.elasticsearch.search.sort.SortBuilder; | |
| import org.elasticsearch.search.sort.SortBuilders; | |
| import org.elasticsearch.search.sort.SortOrder; | |
| import org.elasticsearch.transport.client.PreBuiltTransportClient; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| /** | |
| * Helper utilities for connecting to an Elasticsearch cluster. | |
| * @author Alex Bogdanovski [alex@erudika.com] | |
| */ | |
| public final class ElasticSearchUtils { | |
| private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class); | |
| private static TransportClient searchClient; | |
| private static final int MAX_QUERY_DEPTH = 10; // recursive depth for compound queries - bool, boost | |
| private static final String DATE_FORMAT = "epoch_millis||epoch_second||yyyy-MM-dd HH:mm:ss||" | |
| + "yyyy-MM-dd||yyyy/MM/dd||yyyyMMdd||yyyy"; | |
| static final String PROPS_FIELD = "properties"; | |
| static final String PROPS_PREFIX = PROPS_FIELD + "."; | |
| static final String PROPS_JSON = "_" + PROPS_FIELD; | |
| static final String PROPS_REGEX = "(^|.*\\W)" + PROPS_FIELD + "[\\.\\:].+"; | |
| /** | |
| * Switches between normal indexing and indexing with nested key/value objects for Sysprop.properties. | |
| * When this is 'false' (normal mode), Para objects will be indexed without modification but this could lead to | |
| * a field mapping explosion and crash the ES cluster. | |
| * | |
| * When set to 'true' (nested mode), Para objects will be indexed with all custom fields flattened to an array of | |
| * key/value properties: properties: [{"k": "field", "v": "value"},...]. This is done for Sysprop objects with | |
| * containing custom properties. This mode prevents an eventual field mapping explosion. | |
| */ | |
| static boolean nestedMode() { | |
| return Config.getConfigBoolean("es.use_nested_custom_fields", false); | |
| } | |
| /** | |
| * A list of default mappings that are defined upon index creation. | |
| */ | |
| private static String getDefaultMapping() { | |
| return "{\n" + | |
| " \"paraobject\": {\n" + | |
| " \"properties\": {\n" + | |
| " \"nstd\": {\"type\": \"nested\"},\n" + | |
| " \"properties\": {\"type\": \"" + (nestedMode() ? "nested" : "object") + "\"},\n" + | |
| " \"latlng\": {\"type\": \"geo_point\"},\n" + | |
| " \"_docid\": {\"type\": \"long\", \"index\": false},\n" + | |
| " \"updated\": {\"type\": \"date\", \"format\" : \"" + DATE_FORMAT + "\"},\n" + | |
| " \"timestamp\": {\"type\": \"date\", \"format\" : \"" + DATE_FORMAT + "\"},\n" + | |
| " \"tag\": {\"type\": \"keyword\"},\n" + | |
| " \"id\": {\"type\": \"keyword\"},\n" + | |
| " \"key\": {\"type\": \"keyword\"},\n" + | |
| " \"name\": {\"type\": \"keyword\"},\n" + | |
| " \"type\": {\"type\": \"keyword\"},\n" + | |
| " \"tags\": {\"type\": \"keyword\"},\n" + | |
| " \"token\": {\"type\": \"keyword\"},\n" + | |
| " \"email\": {\"type\": \"keyword\"},\n" + | |
| " \"appid\": {\"type\": \"keyword\"},\n" + | |
| " \"groups\": {\"type\": \"keyword\"},\n" + | |
| " \"password\": {\"type\": \"keyword\"},\n" + | |
| " \"parentid\": {\"type\": \"keyword\"},\n" + | |
| " \"creatorid\": {\"type\": \"keyword\"},\n" + | |
| " \"identifier\": {\"type\": \"keyword\"}\n" + | |
| " }\n" + | |
| " }\n" + | |
| "}"; | |
| } | |
| /** | |
| * These fields are not indexed. | |
| */ | |
| private static final String[] IGNORED_FIELDS = new String[] { | |
| "settings", // App | |
| "datatypes", // App | |
| "deviceState", // Thing | |
| "deviceMetadata", // Thing | |
| "resourcePermissions", // App | |
| "validationConstraints" // App | |
| }; | |
| private ElasticSearchUtils() { } | |
| /** | |
| * Creates an instance of the client that talks to Elasticsearch. | |
| * @return a client instance | |
| */ | |
| public static Client getClient() { | |
| if (searchClient != null) { | |
| return searchClient; | |
| } | |
| String esHost = Config.getConfigParam("es.transportclient_host", "localhost"); | |
| int esPort = Config.getConfigInt("es.transportclient_port", 9300); | |
| boolean useTransportClient = Config.getConfigBoolean("es.use_transportclient", true); | |
| Settings.Builder settings = Settings.builder(); | |
| settings.put("client.transport.sniff", true); | |
| settings.put("cluster.name", Config.CLUSTER_NAME); | |
| if (useTransportClient) { | |
| searchClient = new PreBuiltTransportClient(settings.build()); | |
| TransportAddress addr; | |
| try { | |
| addr = new TransportAddress(InetAddress.getByName(esHost), esPort); | |
| } catch (UnknownHostException ex) { | |
| addr = new TransportAddress(InetAddress.getLoopbackAddress(), esPort); | |
| logger.warn("Unknown host: " + esHost, ex); | |
| } | |
| searchClient.addTransportAddress(addr); | |
| } else { | |
| throw new UnsupportedOperationException("REST client is yet to be supported in Elasticsearch v6."); | |
| } | |
| Runtime.getRuntime().addShutdownHook(new Thread() { | |
| public void run() { | |
| shutdownClient(); | |
| } | |
| }); | |
| // wait for the shards to initialize - prevents NoShardAvailableActionException! | |
| String timeout = Config.IN_PRODUCTION ? "1m" : "5s"; | |
| searchClient.admin().cluster().prepareHealth(Config.getRootAppIdentifier()). | |
| setWaitForGreenStatus().setTimeout(timeout).execute().actionGet(); | |
| if (!existsIndex(Config.getRootAppIdentifier())) { | |
| createIndex(Config.getRootAppIdentifier()); | |
| } | |
| return searchClient; | |
| } | |
| /** | |
| * Stops the client instance and releases resources. | |
| */ | |
| protected static void shutdownClient() { | |
| if (searchClient != null) { | |
| searchClient.close(); | |
| searchClient = null; | |
| } | |
| } | |
| private static boolean createIndexWithoutAlias(String name, int shards, int replicas) { | |
| if (StringUtils.isBlank(name) || StringUtils.containsWhitespace(name) || existsIndex(name)) { | |
| return false; | |
| } | |
| if (shards <= 0) { | |
| shards = Config.getConfigInt("es.shards", 5); | |
| } | |
| if (replicas < 0) { | |
| replicas = Config.getConfigInt("es.replicas", 0); | |
| } | |
| try { | |
| Settings.Builder settings = Settings.builder(); | |
| settings.put("number_of_shards", Integer.toString(shards)); | |
| settings.put("number_of_replicas", Integer.toString(replicas)); | |
| settings.put("auto_expand_replicas", Config.getConfigParam("es.auto_expand_replicas", "0-1")); | |
| settings.put("analysis.analyzer.default.type", "standard"); | |
| settings.putList("analysis.analyzer.default.stopwords", | |
| "arabic", "armenian", "basque", "brazilian", "bulgarian", "catalan", | |
| "czech", "danish", "dutch", "english", "finnish", "french", "galician", | |
| "german", "greek", "hindi", "hungarian", "indonesian", "italian", | |
| "norwegian", "persian", "portuguese", "romanian", "russian", "spanish", | |
| "swedish", "turkish"); | |
| CreateIndexRequestBuilder create = getClient().admin().indices().prepareCreate(name). | |
| setSettings(settings.build()); | |
| // default system mapping (all the rest are dynamic) | |
| create.addMapping("paraobject", getDefaultMapping(), XContentType.JSON); | |
| create.execute().actionGet(); | |
| logger.info("Created a new index '{}' with {} shards, {} replicas.", name, shards, replicas); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| return false; | |
| } | |
| return true; | |
| } | |
| /** | |
| * Creates a new search index. | |
| * @param appid the index name (alias) | |
| * @return true if created | |
| */ | |
| public static boolean createIndex(String appid) { | |
| return createIndex(appid, Config.getConfigInt("es.shards", 5), Config.getConfigInt("es.replicas", 0)); | |
| } | |
| /** | |
| * Creates a new search index. | |
| * @param appid the index name (alias) | |
| * @param shards number of shards | |
| * @param replicas number of replicas | |
| * @return true if created | |
| */ | |
| public static boolean createIndex(String appid, int shards, int replicas) { | |
| if (StringUtils.isBlank(appid)) { | |
| return false; | |
| } | |
| String indexName = appid.trim() + "_1"; | |
| boolean created = createIndexWithoutAlias(indexName, shards, replicas); | |
| if (created) { | |
| boolean withAliasRouting = App.isRoot(appid) && Config.getConfigBoolean("es.root_index_sharing_enabled", false); | |
| boolean aliased = addIndexAlias(indexName, appid, withAliasRouting); | |
| if (!aliased) { | |
| logger.info("Created ES index '{}' without an alias '{}'.", indexName, appid); | |
| } else { | |
| logger.info("Created ES index '{}' with alias '{}'.", indexName, appid); | |
| } | |
| } | |
| return created; | |
| } | |
| /** | |
| * Deletes an existing search index. | |
| * @param appid the index name (alias) | |
| * @return true if deleted | |
| */ | |
| public static boolean deleteIndex(String appid) { | |
| if (StringUtils.isBlank(appid) || !existsIndex(appid)) { | |
| return false; | |
| } | |
| try { | |
| String indexName = getIndexNameWithWildcard(appid.trim()); | |
| logger.info("Deleted ES index '{}'.", indexName); | |
| getClient().admin().indices().prepareDelete(indexName).execute().actionGet(); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| return false; | |
| } | |
| return true; | |
| } | |
| /** | |
| * Checks if the index exists. | |
| * @param appid the index name (alias) | |
| * @return true if exists | |
| */ | |
| public static boolean existsIndex(String appid) { | |
| if (StringUtils.isBlank(appid)) { | |
| return false; | |
| } | |
| // don't assume false, might be distructive! | |
| boolean exists = true; | |
| try { | |
| String indexName = appid.trim(); | |
| exists = getClient().admin().indices().prepareExists(indexName).execute(). | |
| actionGet().isExists(); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| } | |
| return exists; | |
| } | |
| /** | |
| * Rebuilds an index. | |
| * Reads objects from the data store and indexes them in batches. | |
| * Works on one DB table and index only. | |
| * @param dao DAO for connecting to the DB - the primary data source | |
| * @param appid the index name (alias) | |
| * @param isShared is the app shared, controls index aliases and index switching | |
| * @param pager a Pager instance | |
| * @return true if successful, false if index doesn't exist or failed. | |
| */ | |
| public static boolean rebuildIndex(DAO dao, String appid, boolean isShared, Pager... pager) { | |
| if (StringUtils.isBlank(appid) || dao == null) { | |
| return false; | |
| } | |
| try { | |
| String indexName = appid.trim(); | |
| if (!isShared && !existsIndex(indexName)) { | |
| logger.info("Creating '{}' index because it doesn't exist.", indexName); | |
| createIndex(indexName); | |
| } | |
| String oldName = getIndexNameForAlias(indexName); | |
| String newName = indexName; | |
| if (!isShared) { | |
| newName = getNewIndexName(indexName, oldName); | |
| createIndexWithoutAlias(newName, -1, -1); // use defaults | |
| } | |
| logger.info("rebuildIndex(): {}", indexName); | |
| BulkRequestBuilder brb = getClient().prepareBulk(); | |
| BulkResponse resp; | |
| Pager p = getPager(pager); | |
| int batchSize = Config.getConfigInt("reindex_batch_size", p.getLimit()); | |
| long reindexedCount = 0; | |
| List<ParaObject> list; | |
| do { | |
| list = dao.readPage(appid, p); // use appid! | |
| logger.debug("rebuildIndex(): Read {} objects from table {}.", list.size(), indexName); | |
| for (ParaObject obj : list) { | |
| if (obj != null) { | |
| // put objects from DB into the newly created index | |
| brb.add(getClient().prepareIndex(newName, getType(), obj.getId()). | |
| setSource(getSourceFromParaObject(obj)).request()); | |
| // index in batches of ${queueSize} objects | |
| if (brb.numberOfActions() >= batchSize) { | |
| reindexedCount += brb.numberOfActions(); | |
| resp = brb.execute().actionGet(); | |
| logger.info("rebuildIndex(): indexed {}, failures: {}", | |
| brb.numberOfActions(), resp.hasFailures() ? resp.buildFailureMessage() : "false"); | |
| brb = getClient().prepareBulk(); | |
| } | |
| } | |
| } | |
| } while (!list.isEmpty()); | |
| // anything left after loop? index that too | |
| if (brb.numberOfActions() > 0) { | |
| reindexedCount += brb.numberOfActions(); | |
| resp = brb.execute().actionGet(); | |
| logger.info("rebuildIndex(): indexed {}, failures: {}", | |
| brb.numberOfActions(), resp.hasFailures() ? resp.buildFailureMessage() : "false"); | |
| } | |
| if (!isShared) { | |
| // switch to alias NEW_INDEX -> ALIAS, OLD_INDEX -> DELETE old index | |
| switchIndexToAlias(oldName, newName, indexName, true); | |
| } | |
| logger.info("rebuildIndex(): Done. {} objects reindexed.", reindexedCount); | |
| } catch (Exception e) { | |
| logger.warn(null, e); | |
| return false; | |
| } | |
| return true; | |
| } | |
| /** | |
| * @param pager an array of optional Pagers | |
| * @return the first {@link Pager} object in the array or a new Pager | |
| */ | |
| protected static Pager getPager(Pager[] pager) { | |
| return (pager != null && pager.length > 0) ? pager[0] : new Pager(); | |
| } | |
| /** | |
| * The {@code pager.sortBy} can contain comma-separated sort fields. For example "name,timestamp". | |
| * It can also contain sort orders for each field, for example: "name:asc,timestamp:desc". | |
| * @param pager a {@link Pager} object | |
| * @return a list of ES SortBuilder objects for sorting the results of a search request | |
| */ | |
| protected static List<SortBuilder<?>> getSortFieldsFromPager(Pager pager) { | |
| if (pager == null) { | |
| pager = new Pager(); | |
| } | |
| SortOrder defaultOrder = pager.isDesc() ? SortOrder.DESC : SortOrder.ASC; | |
| if (pager.getSortby().contains(",")) { | |
| String[] fields = pager.getSortby().split(","); | |
| ArrayList<SortBuilder<?>> sortFields = new ArrayList<>(fields.length); | |
| for (String field : fields) { | |
| SortOrder order; | |
| String fieldName; | |
| if (field.endsWith(":asc")) { | |
| order = SortOrder.ASC; | |
| fieldName = field.substring(0, field.indexOf(":asc")).trim(); | |
| } else if (field.endsWith(":desc")) { | |
| order = SortOrder.DESC; | |
| fieldName = field.substring(0, field.indexOf(":desc")).trim(); | |
| } else { | |
| order = defaultOrder; | |
| fieldName = field.trim(); | |
| } | |
| sortFields.add(SortBuilders.fieldSort(fieldName).order(order)); | |
| } | |
| return sortFields; | |
| } else { | |
| return Collections.singletonList(StringUtils.isBlank(pager.getSortby()) ? | |
| SortBuilders.scoreSort() : SortBuilders.fieldSort(pager.getSortby()).order(defaultOrder)); | |
| } | |
| } | |
| /** | |
| * Returns information about a cluster. | |
| * @return a map of key value pairs containing cluster information | |
| */ | |
| public static Map<String, NodeInfo> getSearchClusterInfo() { | |
| NodesInfoResponse res = getClient().admin().cluster().nodesInfo(new NodesInfoRequest().all()).actionGet(); | |
| return res.getNodesMap(); | |
| } | |
| /** | |
| * Adds a new alias to an existing index with routing and filtering by appid. | |
| * @param indexName the index name | |
| * @param aliasName the alias | |
| * @return true if acknowledged | |
| */ | |
| public static boolean addIndexAliasWithRouting(String indexName, String aliasName) { | |
| return addIndexAlias(indexName, aliasName, true); | |
| } | |
| /** | |
| * Adds a new alias to an existing index. | |
| * @param indexName the index name | |
| * @param aliasName the alias | |
| * @param withAliasRouting enables alias routing for index with filtering by appid | |
| * @return true if acknowledged | |
| */ | |
| public static boolean addIndexAlias(String indexName, String aliasName, boolean withAliasRouting) { | |
| if (StringUtils.isBlank(aliasName) || !existsIndex(indexName)) { | |
| return false; | |
| } | |
| try { | |
| String alias = aliasName.trim(); | |
| String index = getIndexNameWithWildcard(indexName.trim()); | |
| AliasActions aliasBuilder; | |
| if (withAliasRouting) { | |
| aliasBuilder = AliasActions.add().index(index).alias(alias). | |
| searchRouting(alias).indexRouting(alias). | |
| filter(termQuery(Config._APPID, aliasName)); // DO NOT trim filter query! | |
| } else { | |
| aliasBuilder = AliasActions.add().index(index).alias(alias); | |
| } | |
| return getClient().admin().indices().prepareAliases().addAliasAction(aliasBuilder). | |
| execute().actionGet().isAcknowledged(); | |
| } catch (Exception e) { | |
| logger.error(null, e); | |
| return false; | |
| } | |
| } | |
| /** | |
| * Removes an alias from an index. | |
| * @param indexName the index name | |
| * @param aliasName the alias | |
| * @return true if acknowledged | |
| */ | |
| public static boolean removeIndexAlias(String indexName, String aliasName) { | |
| if (StringUtils.isBlank(aliasName) || !existsIndex(indexName)) { | |
| return false; | |
| } | |
| String alias = aliasName.trim(); | |
| String index = getIndexNameWithWildcard(indexName.trim()); | |
| return getClient().admin().indices().prepareAliases().removeAlias(index, alias). | |
| execute().actionGet().isAcknowledged(); | |
| } | |
| /** | |
| * Checks if an index has a registered alias. | |
| * @param indexName the index name | |
| * @param aliasName the alias | |
| * @return true if alias is set on index | |
| */ | |
| public static boolean existsIndexAlias(String indexName, String aliasName) { | |
| if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) { | |
| return false; | |
| } | |
| String alias = aliasName.trim(); | |
| String index = getIndexNameWithWildcard(indexName.trim()); | |
| return getClient().admin().indices().prepareAliasesExist(index).addAliases(alias). | |
| execute().actionGet().exists(); | |
| } | |
| /** | |
| * Replaces the index to which an alias points with another index. | |
| * @param oldIndex the index name to be replaced | |
| * @param newIndex the new index name to switch to | |
| * @param alias the alias (unchanged) | |
| * @param deleteOld if true will delete the old index completely | |
| */ | |
| public static void switchIndexToAlias(String oldIndex, String newIndex, String alias, boolean deleteOld) { | |
| if (StringUtils.isBlank(oldIndex) || StringUtils.isBlank(newIndex) || StringUtils.isBlank(alias)) { | |
| return; | |
| } | |
| try { | |
| String aliaz = alias.trim(); | |
| String oldName = oldIndex.trim(); | |
| String newName = newIndex.trim(); | |
| logger.info("Switching index aliases {}->{}, deleting '{}': {}", aliaz, newIndex, oldIndex, deleteOld); | |
| getClient().admin().indices().prepareAliases(). | |
| addAlias(newName, aliaz). | |
| removeAlias(oldName, aliaz). | |
| execute().actionGet(); | |
| // delete the old index | |
| if (deleteOld) { | |
| deleteIndex(oldName); | |
| } | |
| } catch (Exception e) { | |
| logger.error(null, e); | |
| } | |
| } | |
| /** | |
| * Returns the real index name for a given alias. | |
| * @param appid the index name (alias) | |
| * @return the real index name (not alias) | |
| */ | |
| public static String getIndexNameForAlias(String appid) { | |
| if (StringUtils.isBlank(appid)) { | |
| return appid; | |
| } | |
| try { | |
| GetIndexResponse result = getClient().admin().indices().prepareGetIndex(). | |
| setIndices(appid).execute().actionGet(); | |
| if (result.indices() != null && result.indices().length > 0) { | |
| return result.indices()[0]; | |
| } | |
| } catch (Exception e) { | |
| logger.error(null, e); | |
| } | |
| return appid; | |
| } | |
| /** | |
| * @param appid the index name (alias) | |
| * @param oldName old index name | |
| * @return a new index name, e.g. "app_15698795757" | |
| */ | |
| static String getNewIndexName(String appid, String oldName) { | |
| if (StringUtils.isBlank(appid)) { | |
| return appid; | |
| } | |
| return (oldName.contains("_") ? oldName.substring(0, oldName.indexOf('_')) : appid) + "_" + Utils.timestamp(); | |
| } | |
| /** | |
| * @param <T> type of ES Response | |
| * @return a callback for handling ES response errors | |
| */ | |
| public static <T extends DocWriteResponse> ActionListener<T> getIndexResponseHandler() { | |
| return new ActionListener<T>() { | |
| public void onResponse(T response) { | |
| int status = response.status().getStatus(); | |
| if (status >= 400) { | |
| logger.warn("Indexing object {}/{} might have failed - status {}.", | |
| response.getIndex(), response.getId(), status); | |
| } | |
| } | |
| public void onFailure(Exception e) { | |
| logger.error("Indexing failure: {}", e); | |
| } | |
| }; | |
| } | |
| /** | |
| * @return a callback for handling ES response errors for bulk requests | |
| */ | |
| public static ActionListener<BulkResponse> getBulkIndexResponseHandler() { | |
| return new ActionListener<BulkResponse>() { | |
| public void onResponse(BulkResponse response) { | |
| int status = response.status().getStatus(); | |
| if (response.hasFailures() || status >= 400) { | |
| logger.warn("Indexing objects in bulk might have failed - status {}. Reason: {}", | |
| status, response.buildFailureMessage()); | |
| } | |
| } | |
| public void onFailure(Exception e) { | |
| logger.error("Bulk indexing failure: {}", e); | |
| } | |
| }; | |
| } | |
| /** | |
| * Check if cluster status is green or yellow. | |
| * @return false if status is red | |
| */ | |
| public static boolean isClusterOK() { | |
| return !getClient().admin().cluster().prepareClusterStats().execute().actionGet(). | |
| getStatus().equals(ClusterHealthStatus.RED); | |
| } | |
| /** | |
| * @return true if asynchronous indexing/unindexing is enabled. | |
| */ | |
| static boolean isAsyncEnabled() { | |
| return Config.getConfigBoolean("es.async_enabled", false); | |
| } | |
| /** | |
| * Creates a term filter for a set of terms. | |
| * @param terms some terms | |
| * @param mustMatchAll if true all terms must match ('AND' operation) | |
| * @return the filter | |
| */ | |
| static QueryBuilder getTermsQuery(Map<String, ?> terms, boolean mustMatchAll) { | |
| BoolQueryBuilder fb = boolQuery(); | |
| int addedTerms = 0; | |
| boolean noop = true; | |
| QueryBuilder bfb = null; | |
| for (Map.Entry<String, ?> term : terms.entrySet()) { | |
| Object val = term.getValue(); | |
| if (!StringUtils.isBlank(term.getKey()) && val != null && Utils.isBasicType(val.getClass())) { | |
| String stringValue = val.toString(); | |
| if (StringUtils.isBlank(stringValue)) { | |
| continue; | |
| } | |
| Matcher matcher = Pattern.compile(".*(<|>|<=|>=)$").matcher(term.getKey().trim()); | |
| if (matcher.matches()) { | |
| bfb = range(matcher.group(1), term.getKey(), stringValue); | |
| } else { | |
| if (nestedMode() && term.getKey().startsWith(PROPS_PREFIX)) { | |
| bfb = nestedPropsQuery(keyValueBoolQuery(term.getKey(), stringValue)); | |
| } else { | |
| bfb = termQuery(term.getKey(), stringValue); | |
| } | |
| } | |
| if (mustMatchAll) { | |
| fb.must(bfb); | |
| } else { | |
| fb.should(bfb); | |
| } | |
| addedTerms++; | |
| noop = false; | |
| } | |
| } | |
| if (addedTerms == 1 && bfb != null) { | |
| return bfb; | |
| } | |
| return noop ? null : fb; | |
| } | |
| /** | |
| * Tries to parse a query string in order to check if it is valid. | |
| * @param query a Lucene query string | |
| * @return the query if valid, or '*' if invalid | |
| */ | |
| static String qs(String query) { | |
| if (StringUtils.isBlank(query) || "*".equals(query.trim())) { | |
| return "*"; | |
| } | |
| query = query.trim(); | |
| if (query.length() > 1 && query.startsWith("*")) { | |
| query = query.substring(1); | |
| } | |
| try { | |
| StandardQueryParser parser = new StandardQueryParser(); | |
| parser.setAllowLeadingWildcard(false); | |
| parser.parse(query, ""); | |
| } catch (Exception ex) { | |
| logger.warn("Failed to parse query string '{}'.", query); | |
| query = "*"; | |
| } | |
| return query.trim(); | |
| } | |
| static Query qsParsed(String query) { | |
| if (StringUtils.isBlank(query) || "*".equals(query.trim())) { | |
| return null; | |
| } | |
| try { | |
| StandardQueryParser parser = new StandardQueryParser(); | |
| parser.setAllowLeadingWildcard(false); | |
| return parser.parse(query, ""); | |
| } catch (Exception ex) { | |
| logger.warn("Failed to parse query string '{}'.", query); | |
| } | |
| return null; | |
| } | |
| /** | |
| * Converts a {@link ParaObject} to a map of fields and values. | |
| * @param po an object | |
| * @return a map of keys and values | |
| */ | |
| @SuppressWarnings("unchecked") | |
| static Map<String, Object> getSourceFromParaObject(ParaObject po) { | |
| if (po == null) { | |
| return Collections.emptyMap(); | |
| } | |
| Map<String, Object> data = ParaObjectUtils.getAnnotatedFields(po, null, false); | |
| Map<String, Object> source = new HashMap<>(data.size() + 1); | |
| source.putAll(data); | |
| if (nestedMode() && po instanceof Sysprop) { | |
| try { | |
| Map<String, Object> props = (Map<String, Object>) data.get(PROPS_FIELD); | |
| // flatten properites object to array of keys/values, to prevent field mapping explosion | |
| List<Map<String, Object>> keysAndValues = getNestedProperties(props); | |
| source.put(PROPS_FIELD, keysAndValues); // overwrite properties object with flattened array | |
| // special field for holding the original sysprop.properties map as JSON string | |
| source.put(PROPS_JSON, ParaObjectUtils.getJsonWriterNoIdent().writeValueAsString(props)); | |
| } catch (Exception e) { | |
| logger.error(null, e); | |
| } | |
| } | |
| for (String field : IGNORED_FIELDS) { | |
| source.remove(field); | |
| } | |
| // special DOC ID field used in "search after" | |
| source.put("_docid", NumberUtils.toLong(Utils.getNewId())); | |
| return source; | |
| } | |
| /** | |
| * Flattens a complex object like a property Map ({@code Sysprop.getProperties()}) to a list of key/value pairs. | |
| * Rearranges properites to prevent field mapping explosion, for example: | |
| * properties: [{k: key1, v: value1}, {k: key2, v: value2}...] | |
| * @param objectData original object properties | |
| * @param keysAndValues a list of key/value objects, each containing one property | |
| * @param fieldPrefix a field prefix, e.g. "properties.key" | |
| */ | |
| @SuppressWarnings("unchecked") | |
| private static List<Map<String, Object>> getNestedProperties(Map<String, Object> objectData) { | |
| if (objectData == null || objectData.isEmpty()) { | |
| return Collections.emptyList(); | |
| } | |
| List<Map<String, Object>> keysAndValues = new LinkedList<>(); | |
| LinkedList<Map<String, Object>> stack = new LinkedList<>(); | |
| stack.add(Collections.singletonMap("", objectData)); | |
| while (!stack.isEmpty()) { | |
| Map<String, Object> singletonMap = stack.pop(); | |
| String prefix = singletonMap.keySet().iterator().next(); | |
| Object value = singletonMap.get(prefix); | |
| if (value != null) { | |
| if (value instanceof Map) { | |
| String pre = (StringUtils.isBlank(prefix) ? "" : prefix + "-"); | |
| for (Entry<String, Object> entry : ((Map<String, Object>) value).entrySet()) { | |
| addFieldToStack(pre + entry.getKey(), entry.getValue(), stack, keysAndValues); | |
| } | |
| } else { | |
| addFieldToStack(prefix, value, stack, keysAndValues); | |
| } | |
| } | |
| } | |
| return keysAndValues; | |
| } | |
| private static void addFieldToStack(String prefix, Object val, LinkedList<Map<String, Object>> stack, | |
| List<Map<String, Object>> keysAndValues) { | |
| if (val instanceof Map) { | |
| // flatten all nested objects | |
| stack.push(Collections.singletonMap(prefix, val)); | |
| } else if (val instanceof List) { | |
| // input array: key: [value1, value2] - [{k: key-0, v: value1}, {k: key-1, v: value2}] | |
| for (int i = 0; i < ((List) val).size(); i++) { | |
| stack.push(Collections.singletonMap(prefix + "-" + String.valueOf(i), ((List) val).get(i))); | |
| } | |
| } else { | |
| keysAndValues.add(getKeyValueField(prefix, val)); | |
| } | |
| } | |
| private static Map<String, Object> getKeyValueField(String field, Object value) { | |
| Map<String, Object> propMap = new HashMap<String, Object>(2); | |
| propMap.put("k", field); | |
| if (value instanceof Number) { | |
| propMap.put("vn", value); | |
| } else { | |
| // boolean and Date data types are ommited for simplicity | |
| propMap.put("v", String.valueOf(value)); | |
| } | |
| return propMap; | |
| } | |
| /** | |
| * Converts the source of an ES document to {@link ParaObject}. | |
| * @param <P> object type | |
| * @param source a map of keys and values coming from ES | |
| * @return a new ParaObject | |
| */ | |
| static <P extends ParaObject> P getParaObjectFromSource(Map<String, Object> source) { | |
| if (source == null) { | |
| return null; | |
| } | |
| Map<String, Object> data = new HashMap<>(source.size()); | |
| data.putAll(source); | |
| // retrieve the JSON for the original properties field and deserialize it | |
| if (nestedMode() && data.containsKey(PROPS_JSON)) { | |
| try { | |
| Map<String, Object> props = ParaObjectUtils.getJsonReader(Map.class). | |
| readValue((String) data.get(PROPS_JSON)); | |
| data.put(PROPS_FIELD, props); | |
| } catch (Exception e) { | |
| logger.error(null, e); | |
| } | |
| data.remove(PROPS_JSON); | |
| } | |
| data.remove("_docid"); | |
| return ParaObjectUtils.setAnnotatedFields(data); | |
| } | |
| /** | |
| * @param operator operator <,>,<=,>= | |
| * @param field field name | |
| * @param stringValue field value | |
| * @return a range query | |
| */ | |
| static QueryBuilder range(String operator, String field, String stringValue) { | |
| String key = StringUtils.replaceAll(field, "[<>=\\s]+$", ""); | |
| boolean nestedMode = nestedMode() && field.startsWith(PROPS_PREFIX); | |
| RangeQueryBuilder rfb = rangeQuery(nestedMode ? getValueFieldName(stringValue) : key); | |
| if (">".equals(operator)) { | |
| rfb.gt(getNumericValue(stringValue)); | |
| } else if ("<".equals(operator)) { | |
| rfb.lt(getNumericValue(stringValue)); | |
| } else if (">=".equals(operator)) { | |
| rfb.gte(getNumericValue(stringValue)); | |
| } else if ("<=".equals(operator)) { | |
| rfb.lte(getNumericValue(stringValue)); | |
| } | |
| if (nestedMode) { | |
| return nestedPropsQuery(keyValueBoolQuery(key, stringValue, rfb)); | |
| } else { | |
| return rfb; | |
| } | |
| } | |
| /** | |
| * @param query query string | |
| * @return a list of composite queries for matching nested objects | |
| */ | |
| static QueryBuilder convertQueryStringToNestedQuery(String query) { | |
| String queryStr = StringUtils.trimToEmpty(query).replaceAll("\\[(\\d+)\\]", "-$1"); | |
| Query q = qsParsed(queryStr); | |
| if (q == null) { | |
| return matchAllQuery(); | |
| } | |
| try { | |
| return rewriteQuery(q, 0); | |
| } catch (Exception e) { | |
| logger.warn(e.getMessage()); | |
| return null; | |
| } | |
| } | |
| /** | |
| * @param q parsed Lucene query string query | |
| * @return a rewritten query with nested queries for custom properties (when in nested mode) | |
| */ | |
| private static QueryBuilder rewriteQuery(Query q, int depth) throws IllegalAccessException { | |
| if (depth > MAX_QUERY_DEPTH) { | |
| throw new IllegalArgumentException("`Query depth exceeded! Max depth: " + MAX_QUERY_DEPTH); | |
| } | |
| QueryBuilder qb = null; | |
| if (q instanceof BooleanQuery) { | |
| qb = boolQuery(); | |
| for (BooleanClause clause : ((BooleanQuery) q).clauses()) { | |
| switch (clause.getOccur()) { | |
| case MUST: | |
| ((BoolQueryBuilder) qb).must(rewriteQuery(clause.getQuery(), depth++)); | |
| break; | |
| case MUST_NOT: | |
| ((BoolQueryBuilder) qb).mustNot(rewriteQuery(clause.getQuery(), depth++)); | |
| break; | |
| case FILTER: | |
| ((BoolQueryBuilder) qb).filter(rewriteQuery(clause.getQuery(), depth++)); | |
| break; | |
| case SHOULD: | |
| default: | |
| ((BoolQueryBuilder) qb).should(rewriteQuery(clause.getQuery(), depth++)); | |
| } | |
| } | |
| } else if (q instanceof TermRangeQuery) { | |
| qb = termRange(q); | |
| } else if (q instanceof BoostQuery) { | |
| qb = rewriteQuery(((BoostQuery) q).getQuery(), depth++).boost(((BoostQuery) q).getBoost()); | |
| } else if (q instanceof TermQuery) { | |
| qb = term(q); | |
| } else if (q instanceof FuzzyQuery) { | |
| qb = fuzzy(q); | |
| } else if (q instanceof PrefixQuery) { | |
| qb = prefix(q); | |
| } else if (q instanceof WildcardQuery) { | |
| qb = wildcard(q); | |
| } else { | |
| logger.warn("Unknown query type in nested mode query syntax: {}", q.getClass()); | |
| } | |
| return (qb == null) ? matchAllQuery() : qb; | |
| } | |
| private static QueryBuilder termRange(Query q) { | |
| QueryBuilder qb = null; | |
| TermRangeQuery trq = (TermRangeQuery) q; | |
| if (!StringUtils.isBlank(trq.getField())) { | |
| String from = trq.getLowerTerm() != null ? Term.toString(trq.getLowerTerm()) : "*"; | |
| String to = trq.getUpperTerm() != null ? Term.toString(trq.getUpperTerm()) : "*"; | |
| boolean nestedMode = nestedMode() && trq.getField().matches(PROPS_REGEX); | |
| qb = rangeQuery(nestedMode ? getValueFieldNameFromRange(from, to) : trq.getField()); | |
| if ("*".equals(from) && "*".equals(to)) { | |
| qb = matchAllQuery(); | |
| } | |
| if (!"*".equals(from)) { | |
| ((RangeQueryBuilder) qb).from(getNumericValue(from)).includeLower(trq.includesLower()); | |
| } | |
| if (!"*".equals(to)) { | |
| ((RangeQueryBuilder) qb).to(getNumericValue(to)).includeUpper(trq.includesUpper()); | |
| } | |
| if (nestedMode) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(trq.getField(), qb)); | |
| } | |
| } | |
| return qb; | |
| } | |
| private static QueryBuilder term(Query q) { | |
| QueryBuilder qb; | |
| String field = ((TermQuery) q).getTerm().field(); | |
| String value = ((TermQuery) q).getTerm().text(); | |
| if (StringUtils.isBlank(field)) { | |
| qb = multiMatchQuery(value); | |
| } else if (nestedMode() && field.matches(PROPS_REGEX)) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(field, value)); | |
| } else { | |
| qb = termQuery(field, value); | |
| } | |
| return qb; | |
| } | |
| private static QueryBuilder fuzzy(Query q) { | |
| QueryBuilder qb; | |
| String field = ((FuzzyQuery) q).getTerm().field(); | |
| String value = ((FuzzyQuery) q).getTerm().text(); | |
| if (StringUtils.isBlank(field)) { | |
| qb = multiMatchQuery(value); | |
| } else if (nestedMode() && field.matches(PROPS_REGEX)) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(field, fuzzyQuery(getValueFieldName(value), value))); | |
| } else { | |
| qb = fuzzyQuery(field, value); | |
| } | |
| return qb; | |
| } | |
| private static QueryBuilder prefix(Query q) { | |
| QueryBuilder qb; | |
| String field = ((PrefixQuery) q).getPrefix().field(); | |
| String value = ((PrefixQuery) q).getPrefix().text(); | |
| if (StringUtils.isBlank(field)) { | |
| qb = multiMatchQuery(value); | |
| } else if (nestedMode() && field.matches(PROPS_REGEX)) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(field, prefixQuery(getValueFieldName(value), value))); | |
| } else { | |
| qb = prefixQuery(field, value); | |
| } | |
| return qb; | |
| } | |
| private static QueryBuilder wildcard(Query q) { | |
| QueryBuilder qb; | |
| String field = ((WildcardQuery) q).getTerm().field(); | |
| String value = ((WildcardQuery) q).getTerm().text(); | |
| if (StringUtils.isBlank(field)) { | |
| qb = multiMatchQuery(value); | |
| } else if (nestedMode() && field.matches(PROPS_REGEX)) { | |
| qb = nestedPropsQuery(keyValueBoolQuery(field, wildcardQuery(getValueFieldName(value), value))); | |
| } else { | |
| qb = wildcardQuery(field, value); | |
| } | |
| return qb; | |
| } | |
| /** | |
| * @param k field name | |
| * @param query query object | |
| * @return a composite query: bool(match(key) AND match(value)) | |
| */ | |
| static QueryBuilder keyValueBoolQuery(String k, QueryBuilder query) { | |
| return keyValueBoolQuery(k, null, query); | |
| } | |
| /** | |
| * @param k field name | |
| * @param v field value | |
| * @return a composite query: bool(match(key) AND match(value)) | |
| */ | |
| static QueryBuilder keyValueBoolQuery(String k, String v) { | |
| return keyValueBoolQuery(k, v, null); | |
| } | |
| /** | |
| * @param k field name | |
| * @param v field value | |
| * @param query query object | |
| * @return a composite query: bool(match(key) AND match(value)) | |
| */ | |
| static QueryBuilder keyValueBoolQuery(String k, String v, QueryBuilder query) { | |
| if (StringUtils.isBlank(k) || (query == null && StringUtils.isBlank(v))) { | |
| return matchAllQuery(); | |
| } | |
| QueryBuilder kQuery = matchQuery(PROPS_PREFIX + "k", getNestedKey(k)); | |
| QueryBuilder vQuery = (query == null) ? matchQuery(getValueFieldName(v), v) : query; | |
| if ("*".equals(v) || matchAllQuery().equals(query)) { | |
| return boolQuery().must(kQuery); | |
| } | |
| return boolQuery().must(kQuery).must(vQuery); | |
| } | |
| /** | |
| * @param query query | |
| * @return a nested query | |
| */ | |
| static NestedQueryBuilder nestedPropsQuery(QueryBuilder query) { | |
| return nestedQuery(PROPS_FIELD, query, Avg); | |
| } | |
| /** | |
| * @param key dotted field path | |
| * @return translate "properties.path.to.key" to "properties.path-to-key" | |
| */ | |
| static String getNestedKey(String key) { | |
| if (StringUtils.startsWith(key, PROPS_PREFIX)) { | |
| return StringUtils.removeStart(key, PROPS_PREFIX).replaceAll("\\[(\\d+)\\]", "-$1").replaceAll("\\.", "-"); | |
| } | |
| return key; | |
| } | |
| /** | |
| * @param v search term | |
| * @return the name of the value property inside a nested object, e.g. "properties.v" | |
| */ | |
| static String getValueFieldName(String v) { | |
| return PROPS_PREFIX + (NumberUtils.isDigits(v) ? "vn" : "v"); | |
| } | |
| /** | |
| * @param from from value | |
| * @param to to value | |
| * @return either "properties.vn" if one of the range limits is a number, or "properties.v" otherwise. | |
| */ | |
| static String getValueFieldNameFromRange(String from, String to) { | |
| if (("*".equals(from) && "*".equals(to)) || NumberUtils.isDigits(from) || NumberUtils.isDigits(to)) { | |
| return PROPS_PREFIX + "vn"; | |
| } | |
| return PROPS_PREFIX + "v"; | |
| } | |
| /** | |
| * @param v search term | |
| * @return the long value of v if it is a number | |
| */ | |
| static Object getNumericValue(String v) { | |
| return NumberUtils.isDigits(v) ? NumberUtils.toLong(v, 0) : v; | |
| } | |
| /** | |
| * A method reserved for future use. It allows to have indexes with different names than the appid. | |
| * | |
| * @param appid an app identifer | |
| * @return the correct index name | |
| */ | |
| static String getIndexName(String appid) { | |
| return appid.trim(); | |
| } | |
| /** | |
| * Para indices have 1 type only - "paraobject". From v6 onwards, ES allows only 1 type per index. | |
| * @return "paraobject" | |
| */ | |
| static String getType() { | |
| return ParaObject.class.getSimpleName().toLowerCase(); | |
| } | |
| /** | |
| * @param indexName index name or alias | |
| * @return e.g. "index-name_*" | |
| */ | |
| static String getIndexNameWithWildcard(String indexName) { | |
| return StringUtils.contains(indexName, "_") ? indexName : indexName + "_*"; // ES v6 | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment