Skip to content

Instantly share code, notes, and snippets.

@geekpete
Forked from mrflip/20130416-todo.md
Last active August 29, 2015 14:13
Show Gist options
  • Save geekpete/5a9e1701f0b5fb95f870 to your computer and use it in GitHub Desktop.
Save geekpete/5a9e1701f0b5fb95f870 to your computer and use it in GitHub Desktop.

Next Steps

  • Measure time spend on index, flush, refresh, merge, query, etc. (TD - done)
  • Take hot threads snapshots under read+write, read-only, write-only (TD - done)
  • Adjust refresh time to 10s (from 1s) and see how load changes (TD)
  • Measure time of a rolling restart doing disable_flush and disable_recovery (TD)
  • Specify routing on query -- make it choose same node for each shard each time (MD)
  • GC new generation size (TD)
  • Warmers
    • measure before/after of client query time with and without warmers (MD)
    • check the impact of frequent segment generation (MD+TD)
  • look at the index.reclaim_deletes_weight (TD)
  • ES has no current way to delay recovery. File an issue (PK4)
  • ES will not currently re-plan recovery. File an issue (PK4)

And then

Not doing

  • mmapfs vs niofs
  • read node / write node topology
  • partial-cluster indexes
  • throttling -- this won't help us, as our index load is non-bursty.

Define a Standard Query Assault Package

  • Write a script that re-issues every slowlog query against the database in order. It should emit the current and original timestamp; the duration then; the reported and wall-clock durations now; the index and shard; the original node; the reported hits; the size in bytes of the response; and the first 50 characters of the query. (DONE)
    • later: add an option that would set cache=false for each query
  • Time the SQAP against a cold cluster that is not receiving writes, against a hot cluster that is receiving writes, and against a hot cluster with reads and writes.

Warmers

  • put warmers in place, such that the Standard Query Assault
  • while under write load, compare query latency with warmers on to query latency without.

Lower refresh time

The refresh time is much faster than it needs to be for the CacheMap-backing indexes. Those only need GETs to be fast, and GETs don't need refresh to be current. (Verify this is true)

Field Data

  • Become able to fully populate the fielddata buffers -- using warmers is best, a script is OK too. (DONE)
  • Make it so we can monitor the fielddata size; fixing bigdesk is best, a script that prints its value to console is better than nothing. (DONE)
  • Make a plot of fielddata size vs. document count
    • verify that document size is generally irrelevant as long as cardinality is low.
    • demonstrate the thing where one bad apple in a mapping with multi-value fields can blow the lid off the index. Is it possible to cap the maximum number of values in a multi-value field?

Thoughtfully apply the new 0.90 fielddata options to the existing mappings:

  • pulsing, bloom
    • make a script that dumps cardinality of fields given a JSON dump
    • identify fields with high or very high cardinality; see what happens when those are set to pulsing+bloom (high) or bloom (very high)
  • disable field data
    • identify fields that are uninteresting to facet on, and disable fielddata
    • find out what happens when you facet on a field with disabled fielddata anyway
  • I think it's unlikely that we'll want the ones that are more aggressive, but they exist.


In Recovery for Recovery Addiction

  • Quantify the time to recover a shard vs. size of shard.
  • Play with forced assignment of shards.
    • in particular: rsync the contents of a shard from one machine to another machine not assigned to that shard. Force assign the machine to the shard. Does it recover with integrity?
    • rsync as before, then cause a couple new segments to be generated (by sending some writes & flushing). Force assign the machine to the shard. Does it recover with integrity?
  • Pound the cluster with queries that will hold it at max CPU (but not so many a backlog starts forming). Now hard kill one (then two) nodes, and bring them back up. How long until the cluster has the full complement of nodes serving data? What was the impact on query performance?
  • In any way you can, try to make elasticsearch fail in recovery. Some ideas:
    • artificially degrade network traffic
    • regicide: write a script that watches the logs, and as soon as a master is elected, kills it. Now also start sniping other nodes. Can you get the cluster to a point where it refuses to assign shards?
    • ...


Best Practices for Mappings

We already do the following:

  • Disable the 'all' field
  • Set dynamic_mapping so that unexpected fields cause an error

Should do/investigate the following:

  • set ignore_malformed true
  • Precision on numbers and dates: If we don't ever want to range query a number -- ids and such -- what do we gain by disabling the 'precision' fields?
  • set a default 'ignore above' (max recorded length) on strings to some medium-large value, perhaps 1kB. Fields that may actually have large bodies should announce the fact.


Durability / DR

  • EBS volume snapshotting
    • make this run as a nightly job (?on each machine?)
    • do a flush before snapshotting; consider pausing flushes during.
  • Using the JSON-dump backups, measure how long takes to do a full recovery into the new version


JVM Tuning

GC Tuning

What Happens

Objects are born in the "Eden space".

There are two survivor spaces, alternately labeled as 'from' and 'to'.

  1. Copy all still-relevant objects from the eden space and the from survivor space to the to survivor space.
  2. The eden and from survivor spaces are now empty
  3. Switch labels on the from and to survivor spaces

Later, old-gen GCs happen. Our goal is to simply provide enough RAM that those are uninteresting.

What you want:

  • the new-gen GCs should not last longer than 50 ms or happen more often than every 10 seconds
  • the old-gen GCs should not last longer than 1 second or happen more often than every 10 minutes.
  • the steady-state old gen occupancy to have reasonable overhead (I've left "reasonable" to be defined by risk tolerance)

Approach

  • Turn on MAX VERBOSITY for gc's: -verbose:gc -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintTenuringDistribution -XX:+PrintGCTimeStamps -XX:+PrintGCApplictaionStoppedTime -XX:+PrintGCDataStamps -Xloggc:/path/to/apps-gc.log
  • Verify (so we can never think of it again) that the perm size is large enough, and not too large as to waste space. Stuff in there is permanent (classes, etc) so there shouldn't be significant GC activity unless you're getting fancy with metaprogramming.
  • The total new space size to be large enough that almost all objects at time of collection are garbage, as shown by the tenuring distribution.
    • watch the size distribution of tenured objects. Increase the new size until there is a big rolloff from the eden to first survivor state.
    • the new size must never be more than 50% of the old-gen size -- you have to be able to tenure out.
    • As you go, verify that the Survivor space is always decently large enough for what it holds.
    • Set the NewSize and MaxNewSize to the same value (you'll know why to set them different if you ever do)
  • Tighten up the survivor spaces to be large enough to never overflow, but not piggish
    • The -XX:SurvivorRatio sets the ratio of the size of one of the survivor spaces to the Eden space. For instance, a value of 4 would divide the spaces up as 4+1+1 -- a 600m new size gives 400m eden and two 100m survivor spaces.
    • -XX:MaxTenuringThreshold sets the number of times an object bounces to one of the survivor spaces before being promoted to the old generation. This just fine-tunes the pause time of new-gen GCs: it slightly smears out & extends the object lifetime. The most likely values here are actually 0 or 1; if zero, you must set the now-irrelevant survivor sizes to tiny (by setting SurvivorRatio to like 1000).

Java 7

  • Java 7 offers significant performance improvements. We'd like to upgrade the ES nodes at least.
    • issue one: this will be a fiddly chef recipe to write
    • issue two: Hadoop and HBase are NOT yet fully Java-7-ok-for-production-use, so machines containing either will have to run Java 6. So the actual move is from Java 6 to Java sometimes-6-sometimes-7.
  • If Elasticsearch uses one of Java 7's new GC strategy -- different than the ParNewGC we use now -- that's important to know before we do a bunch of GC tuning on Java 6.

Summary of JVM Arguments

  • -Xms11000m -Xmx11000m Starting heap, Max heap; always set these to the same value (in this case, 11GB)

  • -Xss64k set this to a small value, or there will be memory pressure from threads

  • New-gen settings, described above: -XX:NewSize=, -XX:MaxNewSize=, -XX:SurvivorRatio= -XX:MaxTenuringThreshold=.

  • A couple settings for processes that 'own' the machine:

    • -XX:+UseCompressedOops -- makes object references more efficient on a 64-bit machine
    • -XX:+AggressiveOpts -- Any parts of the GC that are auto-tuned will act agressively (good).
    • -Dsun.rmi.dgc.client.gcInterval=3600000 -Dsun.rmi.dgc.server.gcInterval=360000 -- don't let it trigger a certain kind of inessential GC frequently.
  • We like this combination of GC strategies:

    -XX:+UseParNewGC 
    -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
    -XX:CMSInitiatingOccupancyFraction=75
    -XX:+UseCMSInitiatingOccupancyOnly
    
  • JMX options: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port= -Djava.rmi.server.hostname= -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false

  • -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/raid0/elasticsearch/scratch/heap -- if there is an OOM, dump the heap. We need to check how long the heap dump on out-of-memory takes; it might be a bad idea.

  • -Djava.net.preferIPv4Stack=true -- use IPv4, not IPv6, by default.

TODO now

  • these should be harmless and helpful; add them to the JVM line:

    # Settings for a machine when this is what it does
    -server -XX:+AggressiveOpts -XX:+UseCompressedOops
    # Don't let the RMI (JMX) interface invoke forced-GC too frequently
    -Dsun.rmi.dgc.client.gcInterval=3600000 -Dsun.rmi.dgc.server.gcInterval=360000
    # Tell me about your world
    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal
    # JMX
    -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port= -Djava.rmi.server.hostname= -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
    
  • Note that the first two pieces of the JMX lines are missing; add them

  • Check that everything that is in the line now is documented above

TODO after GC tuning

Enable these but thoughtfully:

  • -XX:+DisableExplicitGC prevents anyone from forcing GC. There should be no difference of this on or off; check that it's so, then leave it out (i.e. do not disable explicit GC)
  • -XX:+UseLargePages -XX:LargePageSizeInBytes
  • -XX:+UseTLAB
  • -XX:+AlwaysPreTouch -- JVM touches every page on initialization, thus getting them all in memory before entering main(), at cost of increasing your startup time. The AlwaysPreTouch flag is useful in testing when simulating a long running system which has all the virtual memory already mapped into physical memory.
  • (the settings that cause it to give up on too-much or too-frequent GC behavior)

References

Topology (Not Pursuing)

  • Read node / write node -- split the cluster into front nodes (hi-mem, connect to customer) and rear nodes (hi-cpu, only used by Storm).

    • Set allocation to place, for every shard, exactly one of the replicas on a write node (the primary, if possible).
    • read nodes: small write buffer, aggressive
  • indexes shouldn't span whole cluster -- when the shards in each index span the whole cluster, a toxic query will crash all the nodes all at once. If each index only spans half the cluster, you're significantly less likely to bring all the nodes down at once.

    • This will increase asymmetry of access, though, and it's essential that there be one more machine per shard group than the replication factor -- if your replication factor is 3, you must have 4 or more nodes. Shard groups should be sized by the larger of ceil(N/2)-1 machines and replication factor + 1 (that's replicas + 2).
    • Define subgroups of nodes, and use allocation to pin each index to one node subgroup. For eight nodes I'd use groups of four, defining subgroups of a=1-2-3-4, b=1-2-5-6, c=3-4-5-6, d=1-2-7-8, e=3-4-7-8, f=5-6-7-8. For ten nodes I'd also use groups of four, completing the list with g=1-2-9-10, h=3-4-9-10, i=5-6-9-10, j=7-8-9-10.
    • the read-node / write-node split may make this less interesting -- as long as read nodes crash independently of write nodes, it's easier to maintain cluster continuity.

Store Type (Not Pursuing)

  • store type -- the default is niofs. Need to understand risk-reward of mmapfs (which might be high)
  • Store-level throttling
  • Figure out right value for index interval term_index_interval and term_index_divisor.


Recovery (Not Pursuing)

  • What are the implications of auto_expand_replicas?
  • Is there a setting to delay recovery initiation -- to extend the time between a node falling down and when the cluster starts recovering its data?
  • Find out how to make the cluster stop recovery if a better candidate shows up. That is: node Alice falls down; recovery starts from Bob to Charlie for shard X; node Alice come up, with full or mostly-full copy of X. Unless Charlie is pretty far along, Alice should become the recovery target for shard X. (This might already be fixed. Or it might be currently-impossible)

Performance Qualification

Identify all reasons why (eg) Elasticsearch cannot provide acceptable performance for standard requests and Qualifying load. The "Qualifying load" for each performance bound is double (or so) the worst-case scenario for that bound across all our current clients.

Measure performance along the following dimensions:

  • bandwidth (rec/s, MB/s) and latency (s) for 100B, 1kB, 100kB records
  • under read, write, read/write
  • in degraded state: a) loss of one servers and recovering; b) loss of two servers and recovering; c) elevated packet latency + drop rate between "regions"
  • High concurrency: connect with 10,000 simultaneous clients, each asking for something simple.
  • while receiving a flood of bad inputs

General experiments:

  • Capture commonly observed errors, explain them into knowledgebase
  • get a set of exemplars and mountweasels you can use to test system. Should have some with extreme characteristics (huge-sized string; all values same; all values different; etc)
  • how long does it take from event to full health again after a) restart of service; b) reboot of machine; c) stop/start of machine; d) kill -9 of service + restart

Elasticsearch

  • Five queries everyone should know
    • their performance at baseline
  • Field Cache usage vs number of records
  • Write throughput in a) full-weight records; b) Cache map use case (lots of deletes on compaction)
  • Shard assignment
    • Cross-geo replication?
  • Machine sizes: ebs optimized vs not; for write nodes, c1.xl vs m3.xl vs. m1.large; for read nodes, m1.x vs m3.xl
  • Test Failover and backup

Storm

  • CacheMap metrics, tuning
  • In-stream database calls
  • Can I "push"/"flush" DRPC calls?
  • What happens when I fail a tuple?
    • fail-forever / fail-retriably
    • "failure" streams
  • Tracing
    • "tracing" stream
  • Wukong shim
    • failure/error handling
    • tuple vs record
    • serialization
  • Batch size tradeoffs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment