Skip to content

Instantly share code, notes, and snippets.

@toff63
Last active January 10, 2017 06:50
Show Gist options
  • Save toff63/35b26f7a2fde6b1aa9bbd1b264d16f35 to your computer and use it in GitHub Desktop.
Save toff63/35b26f7a2fde6b1aa9bbd1b264d16f35 to your computer and use it in GitHub Desktop.

Elasticsearch

What is it?

Document oriented NoSQL and Search engine that stores and retrieves docuements near real time. It can also be seen as an enhanced distributed cluster of Lucene which is full text search library written in Java.

What is ES cluster model?

Master/slave architecture. Usually 3 master eligible nodes with only one acting as master. Client nodes coordinate requests to the cluster and data node actually contain the data.

What are master nodes responsabilities?

Shard allocation based

It can be based on:

  • Cluster level shard allocation: balance shards accross all data nodes
  • Fault tolerance (avoid having 2 replicas of the same shard on the same node for example)
  • Disk based shard allocation
  • Shard allocation awareness based on racks and AZ
  • Shard allocation filtering to be able to decommission nodes

Master acts on node joining/leaving

Every node has a list of seed nodes in its configuration.

  1. Connect to one of the seed nodes
  2. Retrieve the cluster status from it
  3. Send a join to the master
  4. Master updates the cluster status

Zen discovery

Uses unicast TCP connection to discover and ping every node. From this ping it deduces node health. Only master can update cluster node.

Master election

Any non client node can be configured as master eligible. For the election to happen, there is a minimum of master eligible nodes that need to participate. By default only master eligible nodes and data nodes can participate to the election.

What happen in a data node?

Every index is composed of shards. Those shards are peaces of the index and actually a Lucene engine. Each shard has a primary shard and one or several replicates.

What is a shard?

A shard is a part of an index. It actually is a Lucene instance. Lucene is managed its index (shard at ES level) with segments.

What is a segment?

A segment is an inverted-index! An inverted index maps word or group of word to documents containing it. For each docuemnt it will inform the word is contained in the document as well as how frequent it appears in the document, which are the words around it and several other statistics. It is important to realize, entries in an inverted index can only have proper value with all document being indexed.

What happen when a document is indexed?

A naive approach would be to lock writes and read as adding a document in the index will affect several entries in the reverted index. However, it wouldn't scale and be very slow.

Immutable segments

The solution is to have a immutable segments. Immutability make those segment cacheable. Any derivated data structure can be immutable which is great for performance. The only problem is, you cannot update an immutable segment.

Indexing documents

Segments are stored into disk and cached by the OS so it can be accessed quickly. Those segment present on disk and in memory are listed in a commit log. Each index request first goes to the translog. A translog is file where indexing request are being sequentially written. It is used in case of crash. Once written in the translog, those requests are processed in a memory buffer.

Indexing documents: Refresh

Every second by default, this is configurable, Lucene will take the request in the memory buffer and create a segment in memory. This segment is now searchable and appear to ES clients. However, this newly created segment isn't in the commit log because it isn't on the disk yet. Translog remain untouched for recovery.

Indexing documents: Flush

A flush is basically an Linux Fsync. It is forced every 5 seconds by default. This is also configurable. This Fsync will write the newly created segment to disk. Lucene will create a new commit point including this segment and the translog will be deleted.

How does search work in this fragmented revert index?

Lucene keeps track of added, updated and deleted files between segments. It will first execute the search query on the oldest segment, then to the second oldest until it performed the search in all segment. The newest info will override the old one, the deleted documents will be removed from the result.

Isn't segment profliferation bad for performance?

If we keep adding new segment without merging them and maintining them in a low count, yes it will deterior performance. That's why there is a merge process hapening in background.

Merge

The merge happen in background. It tries to find segment of similar size, including segments not commited to create new ones. This is CPU and IO intensive so it happens with its own frequency.

How indexing work in the cluster?

An index request arrive to a client node. Based on the docuemnt id hashing method, it knows which shard will own this document. Having the cluster state from master, it also knows which instance has the primary shard of this particular shard. It will forward the request to this node. This node will index the document before forwarding the request to the nodes having replica of this shard. When a quorum of those nodes return they successfuly indexed the document, it returns to the client node. Quorum is the default, however it can be configured otherwise.

How do I scale indexation?

In order to scale indexation, you need to spread the load as indexation is very IO intensive and consequently can consume a fair amount of CPU. In order to spread the load, you need more primary shards. In other words, to scale indexation, you create more shards. The problem however is you cannot change this number of shard once the index is created. You consequently need to plan this carefully and test it before hand.

How is a search request processed?

When a search request arrive to a client node, it obviously cannot know which data node has relevant documents matching the request. Hence it will do a shotgun request to all shards of targetted index. Those shards can be primary or replica. This phase is the query phase. Each shard will return document ids and score matching the request. Client node will then aggregate the results and only requests document it will need to return to data nodes. This is the fetch phase.

How do I scale search?

In order to scale search, you want to spread those query phases. You can do this by having more replicas. The more replica you have, the more option the client node will have to send the query. Hence you scale search by adding and removing replicas to shards.

How does ES caches data

Elasticsearch caches queries on a per-segment basis to speed up response time. This cache reside in the heap. Consuming too much heap will slow your node down. In elasticsearch each field can be stored in 2 ways:

  • exact value (like dates)
  • full text: the text will be indexed (normalized, tokenized and inserted into an inverted index) Elasticsearch has 2 main types of cache:
  • fielddata
  • filter

What is a fielddata

A fielddata cache occur when sorting or aggregating on a field. It basically has to uninvert the inverted index to create an array of every field value per field, in document order.

Let's say we have an index with 3 documents. Two of them have a field named city, one with value Porto Alegre and the other with value Porto Seguro. We want to sort documents on the city. Here is our inverted index:

Key Doc1 Doc2 Doc3
porto X X
seguro X
plegre X

We search for documents with city having Porto in it. Elasticsearch will

  1. Scan the inverted index to find Doc1 and Doc2 have the word Porto in them
  2. For Doc1 and Doc2, it will go through every term in the index and collect tokens from that document, creating a structure like: Doc1 porto, alegre Doc2 porto, seguro
  3. Now that we have uninverted the inverted index, we can compile the unique tokens of all documents (porto, alegre, seguro).

This is how a field data is created and it is then used to sort and aggregate on field. Compiling fielddata can consume a lot of heap memory. Starting ES 1.3, a circuit breaker will stop doing this when fielddata would use more than 60% of heap.

How does filtering works?

For non scoring query - queries that don't compute relevance - the following steps are executed:

  1. Find matching docs: use the reverted index to get the list of matching documents.
  2. For each filter, build a bitset: build an array of 0 and 1 to determine which document should be returned (1 is for a match and 0 is for a miss). Internally, it uses a roaring bitmap which is efficient for sparse and dense sets.
  3. Iterate over the sparser bitset (the one that exclude most documents) first and look for union/intersection to match the filtering query
  4. Increment usage counter

Inverted index is already very fast. Caching all filters wouldn't be memory efficient, so Lucene tracks the history of query usage on a per index basis.Cache is omitted on small segments as they will be merged and are already very fast (less than 10k docs or less than 3% of the total index size). If a filter is used more than a few times in the last 256 queries, it is cached.

Non scoring query part is executed first to limit the amount of document on which the score will be computed.

What is a Frame Of Reference?

A segment can contain up to 2^31-1 documents. They are stored sequentially and can be referenced by their id, from 0 to 2^31-1, this is called the DocId and serve as index of the document in the segment. Inverted index needs to map a term to a list of matching documents, this is called a posted list, and those doc ids are perfect fit as they can be compressed efficiently.

In order to compute intersection and union efficiently, we need those posting lists to be sorted. Lucene, to store inverted index on disk will:

  1. Split posting lists into block of 256 Doc Ids.
  2. For each block it performs delta encoding to only have numbers from 0 to 255 which fit in 1 byte: [73, 300, 302, 343, 372] becomes [73, 227, 2, 30, 11, 29]
  3. For each block it then performs bit packing: compute the max number of bits to encode the bigger delta, put this number in front of the block and encode all ids with this size.

What is a Roaring bitmap?

For filtering, we need to map (filter, segment) to a list of Doc Ids. Constraints are:

  • compression isn't the most critical part, latency is
  • cache needs to be faster than applying the filter
  • cached filter is stored in memory while posting-lists are stored on disk

Option 1: Integer array

Very CPU efficient but very bad in term of memory used as this technic would use 4 bytes per match.

Option 2: Bitmap

A bitmap store a 1 for a match and a 0 for a miss. In term of memory we always uses 1 bit. For very dense set, this is very effective. However for very sparse set, this is very ineficient in term of memory. You always the same amount of memory, no matter how many matches you have. It was used until Lucene 5. It is also very CPU efficient as we count consecutive 0 and CPU have specific instructions for that.

Option 3: roaring bitmaps

This encoding takes the best of options 1 and 2.

  1. Splits posting lists into blocks depending on the 16 highest bits: first block would contain values from 0 to 2^16 (65535), second block would contains values from 65536 to 2^17-1 (131071).
  2. Encode every number into a short (2 bytes)
  3. if a block as more than 4096, encode using a bit set, otherwise into an array. 4096 is the threshold when bitmap start being more memory efficient than array.

Example:

  1. 1000 62101 131385 132052 191173 196658
  2. (0, 1000) (0, 62101) (2, 313) (2, 980) (2, 60101) (3, 50)
  3. [1000 62101] [0] [313,980,60101] [50]

How does fiter cache work?

Filter caches are roaring bit sets. When a filter is used frequently enough on a big enough segment, it will be kept in memory. Each time a new document is indexed, the bitset is updated so filters are real time.

Those filters are part of the query so it can be reused, whatever goes around the filter.

{ 
  "bool": {
    "must": [
      { "term": { "folder": "inbox" }}, (1)
      { "term": { "read": false }}
    ]
    }
},
{
  "bool": {
    "must_not": {
      "term": { "folder": "inbox" } (2)
    },
    "must": {
      "term": { "important": true }
    }
}}

(1) and (2) are identical and will use the same bitset.

Cache eviction is done on an LRU basis.

Let's talk about heap

But first let's refresh our memory about how Compressed Oops

Compressed Ordinary Object Pointer

Problem being solved

There are 3 problems here:

  1. 32 bits architecture only let you address around 4G of heap which is too little for modern applications like ES. However it is usually faster than 64 bits applications
  2. 64 bits architecture let you map heap memory addresses to 64 bits, which is the double of a 32 bits architecture. Even though it let you address up to 16PB of memory, you just don't need that so a lot of those addresses are pure waste of memory. Consequently your application uses more memory on a 64bits arch than on a 32 bits archs.
  3. Nowaday the memory size isn't the problem anymore. The limitation is the bandwidth we have to access the data in memory.

Solution

Memory alignment

When CPU loads data from memory to its cache, it doesn't by word size of 8 bytes. In order to get a higher probability of CPU cache hit, the JVM only addresses objects on 8 bytes multiples. In other word, an object address can only be 0, 8, 16, 24, ...

Oops

As an object can only be addressed on a multiple of 8 bytes, we can divide those addresses by 8 (2^3) to use less space and store the result: 1 map to address 8, 2 maps to address 16, ... The encoding and decoding is just a matter of shifting 3 times either to the left or the right which is very fast. By storing this number instead of the actual address, we can store 2^(3+32) = 2^35 = 32GB on 32 bits. By this memory alignment and compressed Oops, we limit the number of stored object to 4 billion on a 32GB.

Compressed Oops is enabled by default on heap below 32GB.

Zero based Compressed oops

All we discussed about compressed oops so has made the assumption that the OS let the JVM reserve its heap at a virtual address of zero. If the OS authorize this allocation, then all work as described before: the conversion between compressed oops and oops is just a matter of scaling the figure by 8. Otherwise, it needs to add the offset during decoding and substract during encoding. This has a huge impact on performance. The cutoff for using zero based compressed oop varies between OS, but 26G is a conservative cutoff accross a variety of OS.

Code in case of non zero based compressed oops

if (<narrow-oop> == NULL)
    <wide_oop> = NULL
else
    <wide_oop> = <narrow-oop-base> + (<narrow-oop> << 3)

To know if you are using zero based compressed oops, runs the jvm with options: -XX:+UnlockDiagnosticVMOptions -XX:+PrintCompressedOopsMode and look for the line

heap address: 0x000000011be00000, size: 27648 MB, zero based Compressed Oops

If zero based compressed oops isn't enable you will see:

heap address: 0x0000000118400000, size: 28672 MB, Compressed Oops with base: 0x00000001183ff000

ES looks for this information by default and return it in the node info API as well as in startup logs.

ES advises

Use as less heap as you can so GC are smaller and OS can cache more files. G1 GC will be the default GC in Java 9, although it is optimized for big heaps, it presented scary bugs in lucene. This is still not recommended for ES.

How to reduce heap usage?

  • Use doc values instead of field data wherever you can
  • Disk based norm. Lucene norm is part of the scoring computation. It is independant on the query so it is computed at indexing time. Lucene supports storing it on disk
  • Use doc values instead of multi-fields. Multi fields is when you index a field in various way like text for full text search and keyword for agregations and sorting.
  • Do not over shard. The more shard you have for a search request, the more data to you load due to overhead.
  • Avoid overly large bulk index queries (32Mb is ok but 256Mb is not)
  • Page your search, do not request too many hits at a time
  • Avoid requesting too many aggregation bucket or deeply nested aggregations
  • Consider trading performance for memory and use breadth_first collection mode for deep aggregations.

What are doc values?

2 access patterns:

  • Search: you have a term and are looking for matching documents. Inverted index is your friend!
  • Sorting, aggregating, access a specific field: you have the documents and you are looking for a field in them. Inverted index is your enemy!

Doc values are your friend for the latest case. At index time, your documents are stored on disk on a column-oriented fashion which make them easily sorted and aggregated. Almost all types of field support this feature and this is enabled by default. analyzed fields cannot be stored as doc values.

doc values are generated per segment and are consequently immutable and serialized on disk. They are cached by the OS the same way segments are.

Column store compression

As data is stored by column, each column can be compressed its own efficient way. In case of a document of numbers, ES will look for the greater minimum multiple to divide each number by it and use less space. For example, a column with values [100, 400, 1500] would be compressed to [1, 4, 15].

The following compression schemes are checked in order:

  1. If all the values are identical (or missing), set a flag and record the value
  2. If there are less than 256 values, a simple encoding table is used
  3. If there are more than 256 values, check if there is a common divisor
  4. If there are no common divisor, encode each value as an offset of the smallest value

Strings are de-duplicated, sorted into a table and assigned an id. Then those ids are used as numeric doc values. Once they are numeric they benefit of all the compression schemes desbribed above.

Cool but what about efficient aggregation on analyzed field?

For aggregation, you can use multi-fields: you index it as an analized string and as a raw non-analized string.

What about sorting?

Doc_value is good for strings that are not too big. An analyzed string can be a pdf with thousand of words. That's why doc values don't handle analyzed string. Instead ES will use another structure: fielddata which lives in the heap. As a consequence, it is less scalable and has several edge cases.

How to co-locate related data on same shard?

You can use custom routing:

  1. when creating your index, you mark _routing as required
  2. for each document you specify the value for routing
  3. You specify the routing value at query time

Doing so, you limit the number of shard called and you speed up the search process. However, be careful in not creating hot spots!

Example from datadog article:

curl -XPUT "localhost:9200/blog_index" -d '
{
  "mappings": {
    "blogger": {
      "_routing": {
        "required": true 
      }
    }
  }
}'

curl -XPUT "localhost:9200/blog_index/blogger/1?routing=blogger1" -d '
{
  "comment": "blogger1 made this cool comment"
}'

curl -XGET "localhost:9200/blog_index/_search?routing=blogger1" -d '
{
  "query": {
    "match": {
      "comment": {
        "query": "cool comment"
      }
    }
  }
}'

How many primary shard should I create?

1 primary shard per node is a good idea, so you spread the load of indexing evenly. Less than 1 primary shard per node will create some hotspot as nodes without primary shard will be less stressed as nodes with primary shard will handle more index requests. Be careful in not putting too many shard per node as it is going to stress nodes.

References

Zen discovery process

There is absolutely no documentation about this part beside the code. Here is the result of what is in the ES 5.1 code.

The node starts when the method start is called in class Node. Several services are started, including the cluster and discovery service.

Cluster state

It represents the state of the cluster. It is immutable, except the routing nodes structure which is built on demand based on the routing table and cluster status. It can only be updated by the master node and is controlled by a single thread in the ClusterService. After every update, the method publish for the Discovery class sends the new version of the cluster state to all other nodes. The publishing process depends on the type of discovery chosen: Zen, AWS, GCP, Azure, etc.

Cluster state diff

The cluster state implements Diffable interface in order to support publishing a cluster state difference instead of the full state. In order to verify the diff are applied in the right order, each state contains a uuid that diff can refer. If the current state uuid and diff uuid don't match, an exception is thrown and the publishing mechanism should send the full state. When a node joins the cluster, a full state is sent to it.

Cluster state content

The cluster state is composed of:

  • version
  • id: uuid
  • routing table: cluster wide routing table for all indices. It is also versionned
  • discovery nodes: class holding all discovery nodes in the cluster and provide methods to access, merge / diff them. Discovery nodes are data, master and ingest nodes. A discovery node represent a node which is part of the cluster. A discovery node object contains its name, address, hostname, id, atributes and roles.
  • metadata: cluster, indices, templates, aliases and custom metadata
  • cluster blocks: represent cluster level blocks to block dirty operations done against the cluster. A cluster block has an id, a description, can be retryable, has a status and a set of level of operation. The level can be READ, WRITE, METADATA_READ or METADATA_WRITE. Cluster blocks is initialized with global blocks and indices blocks.
  • custom properties
  • name
  • status: can be UNKNOWN, RECEIVED, BEING_APPLIED or APPLIED
  • routing nodes (built on demand): Represents routing information contained in the cluster state. It can be updated with the following methods: initializeShard, startShard, relocateShard, failShard. It maps shards to RoutingNode. A routing node being a discovery node with the shards it hosts.

Zen Discovery Service

Zen discovery service is composed of:

  • transport service: manages connection and communication with other cluster nodes. It has a pluggable transport mode which can be either local, tcp or used for mocking transport behaviour.
  • cluster service which manages the cluster state
  • allocation service: manages shard allocation and shard re-routing.
  • cluster name
  • discovery settings
  • Zen ping
  • master fault detection
  • nodes fault detection
  • publish cluster state action
  • membership action
  • elect master service
  • Join thread control
  • node join controller
  • node removal executor

Zen Ping

Action name: "internal:discovery/zen/unicast" Create DiscoveryNode based on a list of hosts. Ping other hosts with retry logic.

Master fault detection

Action name: "internal:discovery/zen/fd/master_ping" Uses ping failure to detect a fault. It has its own retry mechanism. In case of master failure, it calls the cluster service method submitStateUpdateTask so a task is ran on all discovery node. This task first check if the node running it is already master. In this case it stops there. Otherwise, it removes the old master node from the discovery nodes, flush old pending state changes from old master to avoid having it elected again. Call a rejoin

Rejoin is like a reboot. It stops the node and master failure detection, starts a new master election and ask everybody to rejoin.

Nodes fault detection

Action name: "internal:discovery/zen/fd/ping" Creates one Node fault detection object per known node. This node fault detection will ping the node to make sure it is still alive. In case of failure, if we are not the master node, we do nothing. If we are the master node, we submit a task to the cluster service to remove the node from the state

Publish cluster state action

Send action name: "internal:discovery/zen/publish/send" Commit action name: "internal:discovery/zen/publish/commit"

Its main method is publish. It publishes the change first to master eligble nodes. If more than discovery.zen.minimum_master_nodes acknowledge the change, then it is published to other nodes. If this is the first publish or if the node is new, a full serialized state will be sent to the node. Otherwise a diff will be sent.

Membership action

Discovery join action: "internal:discovery/zen/join" Discovery validate join action: "internal:discovery/zen/join/validate" Discovery leave action: "internal:discovery/zen/leave"

The validation only check if the joining node managed to serialize the cluster state.

Handling join request

  1. Use transport service to check the new node address is supported. If it isn't the node just log it received an invalid address in a join. The TCP validation just make sure it is an instance of InetSocketTransportAddress which is a class encapsulating java.net.InetSocketAddress.
  2. The node tries to connect to the joining node using the transport service.
  3. Sends a validate join to the joining node. If it fails, it sends a message to the joining node with the exception
  4. In case of joining success, we handle the join request and fall into 2 cases:
  5. We are in the middle of an election: we add the incoming join to a list and check if there is an ongoing request to become master and we have enough pending join requests.
  6. Update cluster state to add the node

Handling leave request

If the node receiving the request is the master, it just submits a task to the cluster service to update the state. If the node is not the master and the leaving node is the master, it proceed the same as if the master node had failed: master node re-election making sure the previous master cannot be elected again followed by a join from every node to the master.

Node join controller

Node joining are directly added to the cluster state or accumulated during master election.

Node removal cluster state task executor

It is a cluster state task executor and cluster state task listner. It is composed of:

  • Allocation service which takes care of assigning shards to nodes
  • Elect master service which take car of the master election
  • A rejoin function taking as argument a cluster state and a string and returning a new cluster state.

It removes the nodes from the cluster state. Then it checks if there are enough nodes with role master. If there isn't enough master eligible nodes, it call the rejoin function, otherwise it calls the allocation service to disassociate shards from the removed nodes.

Elect master service

A master candidate must have the role master and then it is evaluated based on the cluster state version number it has (the higher wins) and the one with the higher id. Election only occurs if more than discovery.zen.minimum_master_nodes candidate are present. This number should be higher than the quorum of master eligible node.

Join thread control

This class makes sure the thread joining the cluster is the same managing the cluster state. It is important to make sure the background joinging process is in sync with any cluster state updates like master loss, failure to join, received while joining.

Node start

ZenDiscovery method startInitialJoin is the first method to be called. It submits a task to the cluster service which runs whatever the node role and starts a join on the join thread control. This first join will trigger a master election for the node starting. The master election consists in:

  1. send a ping to all nodes listed in the configuration and wait for their response. Each ping response contains the cluster name, the node details (DiscoveryNode), the master node details (DiscoveryNode object) and the cluster state version.
  2. Add itself to the list of ping responses
  3. Filter or not the ping responses based on discovery.zen.master_election.ignore_non_master_pings. If this setting is true, it only keeps the response from master eligible nodes.
  4. From the filtered ping response it builds the list of active master nodes which aren't itself
  5. From the filtered ping response it builds the list of master eligible nodes
  6. If there are already elected masters, choose the one being master eligible and the smallest when comparing their ids.
  7. If there isn't any elected master and we have enough master eligible nodes, then we can deduce the master node based on the following priority rules:
  8. Higher cluster state version
  9. Master eligible
  10. Lexicological smallest id
  11. If the current node is the master node, we wait for enough other nodes to join (determined by discovery.zen.minimum_master_nodes). If there are enough joins, we are the master and start node failure detector, otherwise we restart the join process expecting another node to be the master.
  12. If the current node isn't the master, we send a join to the master node and then submit a state update task to be added to the cluster. From there, several situations can occur:
  • If this update task fails, we restart the joining process looking for another master
  • if we don't find the master in the retrieved state, we start over again looking for another master,
  • if the master in the state doesn't match the one we found previously, we start a re-join
  • If the master in the state is the same that we found, we start the master fault detection.

Rejoin process:

  1. stop master and node failure detection
  2. enable no master block
  3. Remove master node from the local cluster state
  4. Start the join thread control

Discovery with AWS Plugin

AWS plugin extends the ZenDiscovery. The main change is the way you discover the other nodes. Instead of reading from the configuration, you retrieve them from AWS API based on a list of security group. The instances can be filtered by tags and availability zones.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment