Last active
November 3, 2017 02:49
-
-
Save nsoft/397da0bafbf4f59acd793218472c3710 to your computer and use it in GitHub Desktop.
SOLR-11487-watch.patch -- Patch vs master as of ~9pm Nov 2 2017
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java (date 1509475828000) | |
+++ solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java (date 1508764667000) | |
@@ -21,9 +21,7 @@ | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Locale; | |
-import java.util.Objects; | |
import java.util.Set; | |
-import java.util.concurrent.TimeUnit; | |
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd; | |
import org.apache.solr.common.SolrException; | |
@@ -32,8 +30,6 @@ | |
import org.apache.solr.common.cloud.ZkStateReader; | |
import org.apache.solr.common.util.NamedList; | |
import org.apache.solr.common.util.StrUtils; | |
-import org.apache.solr.util.TimeOut; | |
-import org.apache.zookeeper.KeeperException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
@@ -57,20 +53,7 @@ | |
ZkStateReader zkStateReader = ocmh.zkStateReader; | |
validateAllCollectionsExistAndNoDups(collections, zkStateReader); | |
- byte[] jsonBytes = zkStateReader.getAliases().cloneWithCollectionAlias(aliasName, collections).toJSON(); | |
- try { | |
- zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true); | |
- | |
- checkForAlias(aliasName, collections); | |
- // some fudge for other nodes | |
- Thread.sleep(100); | |
- } catch (KeeperException e) { | |
- log.error("", e); | |
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); | |
- } catch (InterruptedException e) { | |
- log.warn("", e); | |
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); | |
- } | |
+ zkStateReader.exportAliasToZk(aliasName, collections); | |
} | |
private void validateAllCollectionsExistAndNoDups(String collections, ZkStateReader zkStateReader) { | |
@@ -89,18 +72,5 @@ | |
} | |
} | |
- private void checkForAlias(String name, String value) { | |
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS); | |
- boolean success = false; | |
- while (!timeout.hasTimedOut()) { | |
- String collections = ocmh.zkStateReader.getAliases().getCollectionAliasMap().get(name); | |
- if (Objects.equals(collections, value)) { | |
- success = true; | |
- break; | |
- } | |
- } | |
- if (!success) { | |
- log.warn("Timeout waiting to be notified of Alias change..."); | |
- } | |
- } | |
+ | |
} | |
Index: solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java (date 1509475828000) | |
+++ solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java (date 1508764667000) | |
@@ -17,6 +17,8 @@ | |
package org.apache.solr.cloud; | |
import java.io.IOException; | |
+import java.util.List; | |
+import java.util.Map; | |
import java.util.function.Consumer; | |
import org.apache.solr.client.solrj.SolrQuery; | |
@@ -28,6 +30,9 @@ | |
import org.apache.solr.client.solrj.request.UpdateRequest; | |
import org.apache.solr.client.solrj.response.QueryResponse; | |
import org.apache.solr.common.SolrException; | |
+import org.apache.solr.common.cloud.Aliases; | |
+import org.apache.solr.common.cloud.SolrZkClient; | |
+import org.apache.solr.common.cloud.ZkStateReader; | |
import org.apache.solr.common.params.ModifiableSolrParams; | |
import org.apache.solr.common.params.SolrParams; | |
import org.junit.BeforeClass; | |
@@ -41,6 +46,95 @@ | |
.addConfig("conf", configset("cloud-minimal")) | |
.configure(); | |
} | |
+ | |
+ @Test | |
+ public void testMetadata() throws Exception { | |
+ CollectionAdminRequest.createCollection("collection1meta", "conf", 2, 1).process(cluster.getSolrClient()); | |
+ CollectionAdminRequest.createCollection("collection2meta", "conf", 1, 1).process(cluster.getSolrClient()); | |
+ waitForState("Expected collection1 to be created with 2 shards and 1 replica", "collection1meta", clusterShape(2, 1)); | |
+ waitForState("Expected collection2 to be created with 1 shard and 1 replica", "collection2meta", clusterShape(1, 1)); | |
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); | |
+ zkStateReader.createClusterStateWatchersAndUpdate(); | |
+ List<String> aliases = zkStateReader.getAliases().resolveAliases("meta1"); | |
+ assertEquals(1, aliases.size()); | |
+ assertEquals("meta1", aliases.get(0)); | |
+ zkStateReader.exportAllAliases(zkStateReader.getAliases().cloneWithCollectionAlias("meta1", "collection1meta,collection2meta")); | |
+ aliases = zkStateReader.getAliases().resolveAliases("meta1"); | |
+ assertEquals(2, aliases.size()); | |
+ assertEquals("collection1meta", aliases.get(0)); | |
+ assertEquals("collection2meta", aliases.get(1)); | |
+ | |
+ // set metadata | |
+ Aliases cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foo", "bar"); | |
+ zkStateReader.exportAllAliases(cloned); | |
+ Map<String, String> meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1"); | |
+ assertNotNull(meta); | |
+ assertTrue(meta.containsKey("foo")); | |
+ assertEquals("bar", meta.get("foo")); | |
+ | |
+ // set more metadata | |
+ cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foobar", "bazbam"); | |
+ zkStateReader.exportAllAliases(cloned); | |
+ meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1"); | |
+ assertNotNull(meta); | |
+ | |
+ // old metadata still there | |
+ assertTrue(meta.containsKey("foo")); | |
+ assertEquals("bar", meta.get("foo")); | |
+ | |
+ // new metadata added | |
+ assertTrue(meta.containsKey("foobar")); | |
+ assertEquals("bazbam", meta.get("foobar")); | |
+ | |
+ // remove metadata | |
+ cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foo", null); | |
+ zkStateReader.exportAllAliases(cloned); | |
+ meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1"); | |
+ assertNotNull(meta); | |
+ | |
+ // verify key was removed | |
+ assertFalse(meta.containsKey("foo")); | |
+ | |
+ // but only the specified key was removed | |
+ assertTrue(meta.containsKey("foobar")); | |
+ assertEquals("bazbam", meta.get("foobar")); | |
+ | |
+ // removal of non existent key should succeed. | |
+ cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foo", null); | |
+ zkStateReader.exportAllAliases(cloned); | |
+ | |
+ // now check that an independently constructed ZkStateReader can see what we've done. | |
+ // i.e. the data is really in zookeeper | |
+ String zkAddress = cluster.getZkServer().getZkAddress(); | |
+ boolean createdZKSR = false; | |
+ try(SolrZkClient zkClient = new SolrZkClient(zkAddress, 30000)) { | |
+ | |
+ ZkController.createClusterZkNodes(zkClient); | |
+ | |
+ zkStateReader = new ZkStateReader(zkClient); | |
+ createdZKSR = true; | |
+ zkStateReader.createClusterStateWatchersAndUpdate(); | |
+ | |
+ meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1"); | |
+ assertNotNull(meta); | |
+ | |
+ // verify key was removed in independent view | |
+ assertFalse(meta.containsKey("foo")); | |
+ | |
+ // but only the specified key was removed | |
+ assertTrue(meta.containsKey("foobar")); | |
+ assertEquals("bazbam", meta.get("foobar")); | |
+ | |
+ Aliases a = zkStateReader.getAliases(); | |
+ Aliases clone = a.cloneWithCollectionAlias("meta1", null); | |
+ meta = clone.getCollectionAliasMetadata("meta1"); | |
+ assertEquals(0,meta.size()); | |
+ } finally { | |
+ if (createdZKSR) { | |
+ zkStateReader.close(); | |
+ } | |
+ } | |
+ } | |
@Test | |
public void test() throws Exception { | |
Index: solr/solrj/src/java/org/apache/solr/common/cloud/Aliases.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/common/cloud/Aliases.java (date 1509475828000) | |
+++ solr/solrj/src/java/org/apache/solr/common/cloud/Aliases.java (date 1509666567000) | |
@@ -33,24 +33,43 @@ | |
*/ | |
public class Aliases { | |
- public static final Aliases EMPTY = new Aliases(Collections.emptyMap()); | |
+ // need to be able to test this with == in constructor, can't use Collections.emptyMap() | |
+ @SuppressWarnings("unchecked") | |
+ public static final Aliases EMPTY = new Aliases(Collections.EMPTY_MAP); | |
+ private static final String COLLECTION_METADATA = "collection_metadata"; | |
+ private static final String COLLECTION = "collection"; | |
/** Map of "collection" string constant to -> | |
* alias name -> comma delimited list of collections */ | |
- private final Map<String,Map<String,String>> aliasMap; // not-null | |
+ private final Map<String,Map> aliasMap; // not-null | |
- private final Map<String, List<String>> collectionAliasListMap; // not-null; computed from aliasMap | |
+ // aliasName --> metadataKey --> metadataValue | |
+ private Map<String, Map<String, String>> collectionAliasMetadata; | |
+ | |
+ @SuppressWarnings("unchecked") | |
public static Aliases fromJSON(byte[] bytes) { | |
if (bytes == null || bytes.length == 0) { | |
return EMPTY; | |
} | |
- return new Aliases((Map<String,Map<String,String>>) Utils.fromJSON(bytes)); | |
+ return new Aliases((Map) Utils.fromJSON(bytes)); | |
} | |
- private Aliases(Map<String, Map<String,String>> aliasMap) { | |
+ @SuppressWarnings("unchecked") | |
+ private Aliases(Map<String, Map> aliasMap) { | |
this.aliasMap = aliasMap; | |
- collectionAliasListMap = convertMapOfCommaDelimitedToMapOfList(getCollectionAliasMap()); | |
+ if (aliasMap.size() == 0) { | |
+ aliasMap = Collections.EMPTY_MAP; | |
+ } | |
+ this.collectionAliasMetadata = this.aliasMap.get(COLLECTION_METADATA); | |
+ if (aliasMap != Collections.EMPTY_MAP) { | |
+ if (collectionAliasMetadata == null) { | |
+ // bootstrap it in if it doesn't exist. | |
+ Map<String, Map<String, String>> newMap = new HashMap<>(); | |
+ this.aliasMap.put(COLLECTION_METADATA, newMap); | |
+ this.collectionAliasMetadata = newMap; | |
+ } | |
+ } | |
} | |
public static Map<String, List<String>> convertMapOfCommaDelimitedToMapOfList(Map<String, String> collectionAliasMap) { | |
@@ -66,21 +85,57 @@ | |
* Does not return null. | |
* Prefer use of {@link #getCollectionAliasListMap()} instead, where appropriate. | |
*/ | |
+ @SuppressWarnings("unchecked") | |
public Map<String,String> getCollectionAliasMap() { | |
- Map<String,String> cam = aliasMap.get("collection"); | |
- return cam == null ? Collections.emptyMap() : Collections.unmodifiableMap(cam); | |
+ Map<String, String> cam = aliasMap.get(COLLECTION); | |
+ if (cam == null) { | |
+ return Collections.emptyMap(); | |
+ } else { | |
+ HashMap<String, String> stringMap = new HashMap<>(); | |
+ for (String alias : cam.keySet()) { | |
+ stringMap.put(alias, cam.get(alias)); | |
+ } | |
+ return Collections.unmodifiableMap(stringMap); | |
+ } | |
} | |
/** | |
* Returns an unmodifiable Map of collection aliases mapped to a list of what the alias maps to. | |
* Does not return null. | |
*/ | |
+ @SuppressWarnings("unchecked") | |
public Map<String,List<String>> getCollectionAliasListMap() { | |
- return Collections.unmodifiableMap(collectionAliasListMap); | |
+ Map listMap = aliasMap.get(COLLECTION); | |
+ if (listMap == null) { | |
+ return Collections.EMPTY_MAP; | |
+ } | |
+ // need to also ensure the list inside the map can't be changed, but don't want to convert lists in | |
+ // source map to immutable, so clone the source map first, then replace the values with immutable lists. | |
+ listMap = convertMapOfCommaDelimitedToMapOfList(listMap); | |
+ for (Object alias : listMap.keySet()) { | |
+ listMap.put(alias,Collections.unmodifiableList((List)listMap.get(alias))); | |
+ } | |
+ return Collections.unmodifiableMap(listMap); | |
+ } | |
+ | |
+ /** | |
+ * Returns an unmodifieable Map of metadata for a given alias. If an alias by the given name | |
+ * exists, this method will never return null. | |
+ * | |
+ * @param alias the name of an alias also found as a key in {@link #getCollectionAliasListMap()} | |
+ * @return The metadata for the alias (possibly empty) or null if the alias does not exist. | |
+ */ | |
+ public Map<String,String> getCollectionAliasMetadata(String alias) { | |
+ if (collectionAliasMetadata == null) { | |
+ return Collections.emptyMap(); | |
+ } | |
+ Map<String, String> map = collectionAliasMetadata.get(alias); | |
+ return map == null ? Collections.emptyMap() : Collections.unmodifiableMap(map); | |
} | |
- | |
+ | |
public boolean hasCollectionAliases() { | |
- return !collectionAliasListMap.isEmpty(); | |
+ Map map = aliasMap.get(COLLECTION); | |
+ return map != null && !map.isEmpty(); | |
} | |
/** | |
@@ -89,7 +144,11 @@ | |
* Treat the result as unmodifiable. | |
*/ | |
public List<String> resolveAliases(String aliasName) { | |
- return resolveAliasesGivenAliasMap(collectionAliasListMap, aliasName); | |
+ Map collectionAliasMap = aliasMap.get(COLLECTION); | |
+ if (collectionAliasMap == null) { | |
+ return Collections.singletonList(aliasName); | |
+ } | |
+ return resolveAliasesGivenAliasMap(convertMapOfCommaDelimitedToMapOfList(collectionAliasMap), aliasName); | |
} | |
/** @lucene.internal */ | |
@@ -97,6 +156,9 @@ | |
//return collectionAliasListMap.getOrDefault(aliasName, Collections.singletonList(aliasName)); | |
// TODO deprecate and remove this dubious feature? | |
// Due to another level of indirection, this is more complicated... | |
+ if (collectionAliasListMap == null) { | |
+ return Collections.singletonList(aliasName); | |
+ } | |
List<String> level1 = collectionAliasListMap.get(aliasName); | |
if (level1 == null) { | |
return Collections.singletonList(aliasName);// is a collection | |
@@ -115,25 +177,101 @@ | |
/** | |
* Creates a new Aliases instance with the same data as the current one but with a modification based on the | |
- * parameters. If {@code collections} is null, then the {@code alias} is removed, otherwise it is added/updated. | |
+ * parameters. | |
+ * <p> | |
+ * Note that the state in zookeeper is unaffected by this method and the change must still be persisted via | |
+ * {@link ZkStateReader#exportAllAliases(Aliases)} | |
+ * | |
+ * @param alias the alias to update, must not be null | |
+ * @param collections the comma separated list of collections for the alias, null to remove the alias | |
*/ | |
+ @SuppressWarnings("unchecked") | |
public Aliases cloneWithCollectionAlias(String alias, String collections) { | |
- Map<String,String> newCollectionMap = new HashMap<>(getCollectionAliasMap()); | |
+ if (alias == null) { | |
+ throw new NullPointerException("Alias name cannot be null"); | |
+ } | |
+ Aliases newAliases = cloneAliases(); // to avoid mutating the object we were given. | |
if (collections == null) { | |
- newCollectionMap.remove(alias); | |
+ newAliases.aliasMap.get(COLLECTION_METADATA).remove(alias); | |
+ newAliases.aliasMap.get(COLLECTION).remove(alias); | |
} else { | |
- newCollectionMap.put(alias, collections); | |
+ newAliases.aliasMap.get(COLLECTION).put(alias, collections); | |
} | |
- if (newCollectionMap.isEmpty()) { | |
+ if (!newAliases.hasCollectionAliases()) { | |
return EMPTY; | |
} else { | |
- return new Aliases(Collections.singletonMap("collection", newCollectionMap)); | |
+ return newAliases; | |
} | |
} | |
+ | |
+ /** | |
+ * Set the value for some metadata on a collection alias. This is done by creating a new immutable Aliases instance | |
+ * with the same data as the current one but with a modification based on the parameters. | |
+ * <p> | |
+ * Note that the state in zookeeper is unaffected by this method and the change must still be persisted via | |
+ * {@link ZkStateReader#exportAllAliases(Aliases)} | |
+ * | |
+ * @param alias the alias to update | |
+ * @param metadataKey the key for the metadata | |
+ * @param metadataValue the metadata to add/replace, null to remove the key. | |
+ * @return An immutable copy of the aliases with the new metadata. | |
+ */ | |
+ @SuppressWarnings("unchecked") | |
+ public Aliases cloneWithCollectionAliasMetadata(String alias, String metadataKey, String metadataValue){ | |
+ if (alias == null || getCollectionAliasMap().get(alias) == null) { | |
+ throw new IllegalArgumentException(alias + " is not a valid alias"); | |
+ } | |
+ if (metadataKey == null) { | |
+ throw new IllegalArgumentException("Null is not a valid metadata key"); | |
+ } | |
+ Aliases aliases = cloneAliases(); | |
+ Map<String, String> metaMap = (Map<String, String>) aliases.aliasMap.get(COLLECTION_METADATA) | |
+ .computeIfAbsent(alias, k -> new HashMap<>()); | |
+ if (metadataValue != null) { | |
+ metaMap.put(metadataKey,metadataValue); | |
+ } else { | |
+ metaMap.remove(metadataKey); | |
+ } | |
+ | |
+ return aliases; | |
+ } | |
+ | |
+ /** | |
+ * Make a deep copy of ourselves. | |
+ * | |
+ * @return a deep copy with no references to the previous copy's internal structures. | |
+ */ | |
+ @SuppressWarnings("unchecked") | |
+ private Aliases cloneAliases() { | |
+ //TODO: think about modeling this with objects not collections if this gets any more complex... | |
+ Map<String,String> newCollectionMap; | |
+ Map existingCollectionAliases = aliasMap.get(COLLECTION); | |
+ if (existingCollectionAliases == Collections.EMPTY_MAP || existingCollectionAliases == null ) { | |
+ // The only reason to call clone is to add something... we need to get rid of the EMPTY_MAP at this point. | |
+ newCollectionMap = new HashMap<>(); | |
+ } else { | |
+ newCollectionMap = new HashMap<>(existingCollectionAliases); | |
+ } | |
+ Map existingMetadata = aliasMap.get(COLLECTION_METADATA); | |
+ Map<String, Map<String, String>> newCollectionMetadataMap; | |
+ if (existingMetadata == null) { | |
+ newCollectionMetadataMap = new HashMap<>(); | |
+ } else { | |
+ newCollectionMetadataMap = new HashMap<>(existingMetadata); | |
+ } | |
+ // need to recreate all the sub-maps too to achieve a deep clone | |
+ for (String alias : newCollectionMetadataMap.keySet()) { | |
+ newCollectionMetadataMap.put(alias, new HashMap(newCollectionMetadataMap.get(alias))); | |
+ } | |
+ HashMap<String, Map> topMap = new HashMap<>(); | |
+ topMap.put(COLLECTION, newCollectionMap); | |
+ topMap.put(COLLECTION_METADATA, newCollectionMetadataMap); | |
+ return new Aliases(topMap); | |
+ } | |
/** Serialize to ZooKeeper. */ | |
public byte[] toJSON() { | |
- if (collectionAliasListMap.isEmpty()) { | |
+ if (!hasCollectionAliases()) { | |
return null; | |
} else { | |
return Utils.toJSON(aliasMap); | |
Index: solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (date 1509475828000) | |
+++ solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (date 1509674445000) | |
@@ -162,6 +162,8 @@ | |
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches"); | |
+ private final AliasWatcher aliasWatcher = new AliasWatcher(); | |
+ | |
/** | |
* Get current {@link AutoScalingConfig}. | |
* @return current configuration from <code>autoscaling.json</code>. NOTE: | |
@@ -193,6 +195,70 @@ | |
return new AutoScalingConfig(map); | |
} | |
+ /** | |
+ * Writes an alias to zk, and waits for up to 30 seconds to see that that the new alias value | |
+ * is available before returning. | |
+ * | |
+ * @param aliasName The alias to store | |
+ * @param collections The collections that are part of the alias | |
+ */ | |
+ public void exportAliasToZk(String aliasName, String collections) { | |
+ Aliases aliases = getAliases().cloneWithCollectionAlias(aliasName, collections); | |
+ try { | |
+ // TODO: worries me that aliases can't be independently updated... | |
+ exportAllAliases(aliases); | |
+ checkForAlias(aliasName, collections); | |
+ // some fudge for other nodes | |
+ Thread.sleep(100); | |
+ this.aliases = aliases; | |
+ } catch (KeeperException e) { | |
+ LOG.error("", e); | |
+ throw new SolrException(ErrorCode.SERVER_ERROR, e); | |
+ } catch (InterruptedException e) { | |
+ LOG.warn("", e); | |
+ throw new SolrException(ErrorCode.SERVER_ERROR, e); | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * Save the alias map and make it the current alias map. | |
+ * | |
+ * @param aliases the alias map to save and remember | |
+ * @throws KeeperException if zookeeper is especially grumpy | |
+ * @throws InterruptedException if we get interrupted while talking to zookeeper. | |
+ */ | |
+ public void exportAllAliases(Aliases aliases) throws KeeperException, InterruptedException { | |
+ synchronized (getUpdateLock()) { // ensure all threads can see the latest when doing this | |
+ byte[] jsonBytes = aliases.toJSON(); | |
+ getZkClient().setData(ALIASES, jsonBytes, true); | |
+ this.aliases = aliases; | |
+ } | |
+ } | |
+ | |
+ private void checkForAlias(String name, String value) throws InterruptedException { | |
+ final boolean[] success = {false}; | |
+ AliasCondition updateVisible = new AliasCondition() { | |
+ @Override | |
+ public boolean test(Aliases aliases) { | |
+ String collections = aliases.getCollectionAliasMap().get(name); | |
+ if (Objects.equals(collections, value)) { | |
+ success[0] = true; | |
+ this.latch.countDown(); | |
+ return true; | |
+ } | |
+ return false; | |
+ } | |
+ }; | |
+ this.aliasWatcher.addCondition(updateVisible); | |
+ updateVisible.await(30, TimeUnit.SECONDS); | |
+ if (!success[0]) { | |
+ LOG.warn("Timeout waiting to be notified of Alias change..."); | |
+ } | |
+ } | |
+ | |
+ | |
+ | |
+ | |
private static class CollectionWatch { | |
int coreRefCount = 0; | |
@@ -386,7 +452,12 @@ | |
refreshLiveNodes(null); | |
} | |
- /** Never null. */ | |
+ /** | |
+ * Get an immutable copy of the present state of the aliases. References to this object should not be retained | |
+ * in any context where it will be important to know if aliases have changed. | |
+ * | |
+ * @return The current aliases, Aliases.EMPTY if not solr cloud, or no aliases have existed yet. Never returns null. | |
+ */ | |
public Aliases getAliases() { | |
assert aliases != null; | |
return aliases; | |
@@ -436,45 +507,7 @@ | |
refreshLegacyClusterState(new LegacyClusterStateWatcher()); | |
refreshStateFormat2Collections(); | |
refreshCollectionList(new CollectionsChildWatcher()); | |
- | |
- synchronized (ZkStateReader.this.getUpdateLock()) { | |
- constructState(Collections.emptySet()); | |
- | |
- zkClient.exists(ALIASES, | |
- new Watcher() { | |
- | |
- @Override | |
- public void process(WatchedEvent event) { | |
- // session events are not change events, and do not remove the watcher | |
- if (EventType.None.equals(event.getType())) { | |
- return; | |
- } | |
- try { | |
- synchronized (ZkStateReader.this.getUpdateLock()) { | |
- LOG.debug("Updating aliases... "); | |
- | |
- // remake watch | |
- final Watcher thisWatch = this; | |
- final Stat stat = new Stat(); | |
- final byte[] data = zkClient.getData(ALIASES, thisWatch, stat, true); | |
- ZkStateReader.this.aliases = Aliases.fromJSON(data); | |
- LOG.debug("New alias definition is: " + ZkStateReader.this.aliases.toString()); | |
- } | |
- } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { | |
- LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); | |
- } catch (KeeperException e) { | |
- LOG.error("A ZK error has occurred", e); | |
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); | |
- } catch (InterruptedException e) { | |
- // Restore the interrupted status | |
- Thread.currentThread().interrupt(); | |
- LOG.warn("Interrupted", e); | |
- } | |
- } | |
- | |
- }, true); | |
- } | |
- updateAliases(); | |
+ refreshAliases(this.aliasWatcher); | |
if (securityNodeListener != null) { | |
addSecuritynodeWatcher(pair -> { | |
@@ -488,6 +521,14 @@ | |
} | |
} | |
+ private void refreshAliases(AliasWatcher watcher) throws KeeperException, InterruptedException { | |
+ synchronized (ZkStateReader.this.getUpdateLock()) { | |
+ constructState(Collections.emptySet()); | |
+ zkClient.exists(ALIASES, watcher, true); | |
+ } | |
+ updateAliases(); | |
+ } | |
+ | |
private void addSecuritynodeWatcher(final Callable<Pair<byte[], Stat>> callback) | |
throws KeeperException, InterruptedException { | |
zkClient.exists(SOLR_SECURITY_CONF_PATH, | |
@@ -1473,4 +1514,74 @@ | |
} | |
+ /** | |
+ * A class to watch for changes to aliases. There should only ever be one instance of this class | |
+ * per instance of ZkStateReader. Normally it will not be useful to create a new instance since | |
+ * this watcher automatically re-registers itself every time it is updated. | |
+ */ | |
+ private class AliasWatcher implements Watcher { | |
+ | |
+ private final List<AliasCondition> conditions = new ArrayList<>(); | |
+ | |
+ @Override | |
+ public void process(WatchedEvent event) { | |
+ // session events are not change events, and do not remove the watcher | |
+ if (EventType.None.equals(event.getType())) { | |
+ return; | |
+ } | |
+ try { | |
+ synchronized (ZkStateReader.this.getUpdateLock()) { | |
+ LOG.debug("Updating aliases... "); | |
+ | |
+ // re-register the watch | |
+ final byte[] data = zkClient.getData(ALIASES, this, null, true); | |
+ Aliases aliases = Aliases.fromJSON(data); | |
+ ZkStateReader.this.aliases = aliases; | |
+ LOG.debug("New alias definition is: " + ZkStateReader.this.aliases.toString()); | |
+ synchronized (conditions) { | |
+ for (AliasCondition condition : conditions) { | |
+ if (condition.test(aliases)) { | |
+ conditions.remove(condition); | |
+ } | |
+ } | |
+ } | |
+ } | |
+ } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { | |
+ LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); | |
+ } catch (KeeperException e) { | |
+ LOG.error("A ZK error has occurred", e); | |
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); | |
+ } catch (InterruptedException e) { | |
+ // Restore the interrupted status | |
+ Thread.currentThread().interrupt(); | |
+ LOG.warn("Interrupted", e); | |
+ } | |
+ } | |
+ | |
+ void addCondition(AliasCondition condition) { | |
+ synchronized (conditions) { | |
+ if (!condition.test(ZkStateReader.this.aliases)) { | |
+ conditions.add(condition); | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+ private abstract class AliasCondition { | |
+ CountDownLatch latch = new CountDownLatch(1); | |
+ | |
+ public void await(long timeout, TimeUnit units) throws InterruptedException { | |
+ latch.await(timeout, units); | |
+ } | |
+ | |
+ /** | |
+ * Perform some test on the aliases. Typical implementations should count down the latch and | |
+ * return true if the test succeeds. | |
+ * | |
+ * @param aliases the aliases object to check. | |
+ * @return true if the desired condition is met and the latch has been counted down. | |
+ */ | |
+ abstract boolean test(Aliases aliases); | |
+ } | |
+ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment