Document oriented NoSQL and Search engine that stores and retrieves docuements near real time. It can also be seen as an enhanced distributed cluster of Lucene which is full text search library written in Java.
Master/slave architecture. Usually 3 master eligible nodes with only one acting as master. Client nodes coordinate requests to the cluster and data node actually contain the data.
It can be based on:
- Cluster level shard allocation: balance shards accross all data nodes
- Fault tolerance (avoid having 2 replicas of the same shard on the same node for example)
- Disk based shard allocation
- Shard allocation awareness based on racks and AZ
- Shard allocation filtering to be able to decommission nodes
Every node has a list of seed nodes in its configuration.
- Connect to one of the seed nodes
- Retrieve the cluster status from it
- Send a join to the master
- Master updates the cluster status
Uses unicast TCP connection to discover and ping every node. From this ping it deduces node health. Only master can update cluster node.
Any non client node can be configured as master eligible. For the election to happen, there is a minimum of master eligible nodes that need to participate. By default only master eligible nodes and data nodes can participate to the election.
Every index is composed of shards. Those shards are peaces of the index and actually a Lucene engine. Each shard has a primary shard and one or several replicates.
A shard is a part of an index. It actually is a Lucene instance. Lucene is managed its index (shard at ES level) with segments.
A segment is an inverted-index! An inverted index maps word or group of word to documents containing it. For each docuemnt it will inform the word is contained in the document as well as how frequent it appears in the document, which are the words around it and several other statistics. It is important to realize, entries in an inverted index can only have proper value with all document being indexed.
A naive approach would be to lock writes and read as adding a document in the index will affect several entries in the reverted index. However, it wouldn't scale and be very slow.
The solution is to have a immutable segments. Immutability make those segment cacheable. Any derivated data structure can be immutable which is great for performance. The only problem is, you cannot update an immutable segment.
Segments are stored into disk and cached by the OS so it can be accessed quickly. Those segment present on disk and in memory are listed in a commit log
. Each index request first goes to the translog. A translog is file where indexing request are being sequentially written. It is used in case of crash. Once written in the translog, those requests are processed in a memory buffer.
Every second by default, this is configurable, Lucene will take the request in the memory buffer and create a segment in memory. This segment is now searchable and appear to ES clients. However, this newly created segment isn't in the commit log because it isn't on the disk yet. Translog remain untouched for recovery.
A flush is basically an Linux Fsync
. It is forced every 5 seconds by default. This is also configurable. This Fsync
will write the newly created segment to disk. Lucene will create a new commit point including this segment and the translog will be deleted.
Lucene keeps track of added, updated and deleted files between segments. It will first execute the search query on the oldest segment, then to the second oldest until it performed the search in all segment. The newest info will override the old one, the deleted documents will be removed from the result.
If we keep adding new segment without merging them and maintining them in a low count, yes it will deterior performance. That's why there is a merge process hapening in background.
The merge happen in background. It tries to find segment of similar size, including segments not commited to create new ones. This is CPU and IO intensive so it happens with its own frequency.
An index request arrive to a client node. Based on the docuemnt id hashing method, it knows which shard will own this document. Having the cluster state from master, it also knows which instance has the primary shard of this particular shard. It will forward the request to this node. This node will index the document before forwarding the request to the nodes having replica of this shard. When a quorum of those nodes return they successfuly indexed the document, it returns to the client node. Quorum is the default, however it can be configured otherwise.
In order to scale indexation, you need to spread the load as indexation is very IO intensive and consequently can consume a fair amount of CPU. In order to spread the load, you need more primary shards. In other words, to scale indexation, you create more shards. The problem however is you cannot change this number of shard once the index is created. You consequently need to plan this carefully and test it before hand.
When a search request arrive to a client node, it obviously cannot know which data node has relevant documents matching the request. Hence it will do a shotgun request to all shards of targetted index. Those shards can be primary or replica. This phase is the query phase. Each shard will return document ids and score matching the request. Client node will then aggregate the results and only requests document it will need to return to data nodes. This is the fetch phase.
In order to scale search, you want to spread those query phases. You can do this by having more replicas. The more replica you have, the more option the client node will have to send the query. Hence you scale search by adding and removing replicas to shards.
Elasticsearch caches queries on a per-segment basis to speed up response time. This cache reside in the heap. Consuming too much heap will slow your node down. In elasticsearch each field can be stored in 2 ways:
- exact value (like dates)
- full text: the text will be indexed (normalized, tokenized and inserted into an inverted index) Elasticsearch has 2 main types of cache:
- fielddata
- filter
A fielddata cache occur when sorting or aggregating on a field. It basically has to uninvert the inverted index to create an array of every field value per field, in document order.
Let's say we have an index with 3 documents. Two of them have a field named city
, one with value Porto Alegre and the other with value Porto Seguro. We want to sort documents on the city
. Here is our inverted index:
Key | Doc1 | Doc2 | Doc3 |
---|---|---|---|
porto | X | X | |
seguro | X | ||
plegre | X |
We search for documents with city having Porto
in it. Elasticsearch will
- Scan the inverted index to find Doc1 and Doc2 have the word Porto in them
- For Doc1 and Doc2, it will go through every term in the index and collect tokens from that document, creating a structure like: Doc1 porto, alegre Doc2 porto, seguro
- Now that we have uninverted the inverted index, we can compile the unique tokens of all documents (porto, alegre, seguro).
This is how a field data is created and it is then used to sort and aggregate on field. Compiling fielddata can consume a lot of heap memory. Starting ES 1.3, a circuit breaker will stop doing this when fielddata would use more than 60% of heap.
For non scoring query - queries that don't compute relevance - the following steps are executed:
- Find matching docs: use the reverted index to get the list of matching documents.
- For each filter, build a bitset: build an array of 0 and 1 to determine which document should be returned (1 is for a match and 0 is for a miss). Internally, it uses a
roaring bitmap
which is efficient for sparse and dense sets. - Iterate over the sparser bitset (the one that exclude most documents) first and look for union/intersection to match the filtering query
- Increment usage counter
Inverted index is already very fast. Caching all filters wouldn't be memory efficient, so Lucene tracks the history of query usage on a per index basis.Cache is omitted on small segments as they will be merged and are already very fast (less than 10k docs or less than 3% of the total index size). If a filter is used more than a few times in the last 256 queries, it is cached.
Non scoring query part is executed first to limit the amount of document on which the score will be computed.
A segment can contain up to 2^31-1 documents. They are stored sequentially and can be referenced by their id, from 0 to 2^31-1, this is called the DocId and serve as index of the document in the segment. Inverted index needs to map a term to a list of matching documents, this is called a posted list, and those doc ids are perfect fit as they can be compressed efficiently.
In order to compute intersection and union efficiently, we need those posting lists to be sorted. Lucene, to store inverted index on disk will:
- Split posting lists into block of 256 Doc Ids.
- For each block it performs delta encoding to only have numbers from 0 to 255 which fit in 1 byte:
[73, 300, 302, 343, 372]
becomes[73, 227, 2, 30, 11, 29]
- For each block it then performs bit packing: compute the max number of bits to encode the bigger delta, put this number in front of the block and encode all ids with this size.
For filtering, we need to map (filter, segment) to a list of Doc Ids. Constraints are:
- compression isn't the most critical part, latency is
- cache needs to be faster than applying the filter
- cached filter is stored in memory while posting-lists are stored on disk
Very CPU efficient but very bad in term of memory used as this technic would use 4 bytes per match.
A bitmap store a 1 for a match and a 0 for a miss. In term of memory we always uses 1 bit. For very dense set, this is very effective. However for very sparse set, this is very ineficient in term of memory. You always the same amount of memory, no matter how many matches you have. It was used until Lucene 5. It is also very CPU efficient as we count consecutive 0 and CPU have specific instructions for that.
This encoding takes the best of options 1 and 2.
- Splits posting lists into blocks depending on the 16 highest bits: first block would contain values from 0 to 2^16 (65535), second block would contains values from 65536 to 2^17-1 (131071).
- Encode every number into a short (2 bytes)
- if a block as more than 4096, encode using a bit set, otherwise into an array. 4096 is the threshold when bitmap start being more memory efficient than array.
Example:
1000 62101 131385 132052 191173 196658
(0, 1000) (0, 62101) (2, 313) (2, 980) (2, 60101) (3, 50)
[1000 62101] [0] [313,980,60101] [50]
Filter caches are roaring bit sets. When a filter is used frequently enough on a big enough segment, it will be kept in memory. Each time a new document is indexed, the bitset is updated so filters are real time.
Those filters are part of the query so it can be reused, whatever goes around the filter.
{
"bool": {
"must": [
{ "term": { "folder": "inbox" }}, (1)
{ "term": { "read": false }}
]
}
},
{
"bool": {
"must_not": {
"term": { "folder": "inbox" } (2)
},
"must": {
"term": { "important": true }
}
}}
(1) and (2) are identical and will use the same bitset.
Cache eviction is done on an LRU basis.
But first let's refresh our memory about how Compressed Oops
There are 3 problems here:
- 32 bits architecture only let you address around 4G of heap which is too little for modern applications like ES. However it is usually faster than 64 bits applications
- 64 bits architecture let you map heap memory addresses to 64 bits, which is the double of a 32 bits architecture. Even though it let you address up to 16PB of memory, you just don't need that so a lot of those addresses are pure waste of memory. Consequently your application uses more memory on a 64bits arch than on a 32 bits archs.
- Nowaday the memory size isn't the problem anymore. The limitation is the bandwidth we have to access the data in memory.
When CPU loads data from memory to its cache, it doesn't by word size of 8 bytes. In order to get a higher probability of CPU cache hit, the JVM only addresses objects on 8 bytes multiples. In other word, an object address can only be 0, 8, 16, 24, ...
As an object can only be addressed on a multiple of 8 bytes, we can divide those addresses by 8 (2^3) to use less space and store the result: 1 map to address 8, 2 maps to address 16, ... The encoding and decoding is just a matter of shifting 3 times either to the left or the right which is very fast. By storing this number instead of the actual address, we can store 2^(3+32) = 2^35 = 32GB on 32 bits. By this memory alignment and compressed Oops, we limit the number of stored object to 4 billion on a 32GB.
Compressed Oops is enabled by default on heap below 32GB.
All we discussed about compressed oops so has made the assumption that the OS let the JVM reserve its heap at a virtual address of zero. If the OS authorize this allocation, then all work as described before: the conversion between compressed oops and oops is just a matter of scaling the figure by 8. Otherwise, it needs to add the offset during decoding and substract during encoding. This has a huge impact on performance. The cutoff for using zero based compressed oop varies between OS, but 26G is a conservative cutoff accross a variety of OS.
Code in case of non zero based compressed oops
if (<narrow-oop> == NULL)
<wide_oop> = NULL
else
<wide_oop> = <narrow-oop-base> + (<narrow-oop> << 3)
To know if you are using zero based compressed oops, runs the jvm with options: -XX:+UnlockDiagnosticVMOptions -XX:+PrintCompressedOopsMode
and look for the line
heap address: 0x000000011be00000, size: 27648 MB, zero based Compressed Oops
If zero based compressed oops isn't enable you will see:
heap address: 0x0000000118400000, size: 28672 MB, Compressed Oops with base: 0x00000001183ff000
ES looks for this information by default and return it in the node info API as well as in startup logs.
Use as less heap as you can so GC are smaller and OS can cache more files. G1 GC will be the default GC in Java 9, although it is optimized for big heaps, it presented scary bugs in lucene. This is still not recommended for ES.
- Use doc values instead of field data wherever you can
- Disk based norm. Lucene norm is part of the scoring computation. It is independant on the query so it is computed at indexing time. Lucene supports storing it on disk
- Use doc values instead of multi-fields. Multi fields is when you index a field in various way like text for full text search and keyword for agregations and sorting.
- Do not over shard. The more shard you have for a search request, the more data to you load due to overhead.
- Avoid overly large bulk index queries (32Mb is ok but 256Mb is not)
- Page your search, do not request too many hits at a time
- Avoid requesting too many aggregation bucket or deeply nested aggregations
- Consider trading performance for memory and use breadth_first collection mode for deep aggregations.
2 access patterns:
- Search: you have a term and are looking for matching documents. Inverted index is your friend!
- Sorting, aggregating, access a specific field: you have the documents and you are looking for a field in them. Inverted index is your enemy!
Doc values are your friend for the latest case. At index time, your documents are stored on disk on a column-oriented fashion which make them easily sorted and aggregated. Almost all types of field support this feature and this is enabled by default. analyzed fields cannot be stored as doc values.
doc values are generated per segment and are consequently immutable and serialized on disk. They are cached by the OS the same way segments are.
As data is stored by column, each column can be compressed its own efficient way. In case of a document of numbers, ES will look for the greater minimum multiple to divide each number by it and use less space. For example, a column with values [100, 400, 1500]
would be compressed to [1, 4, 15]
.
The following compression schemes are checked in order:
- If all the values are identical (or missing), set a flag and record the value
- If there are less than 256 values, a simple encoding table is used
- If there are more than 256 values, check if there is a common divisor
- If there are no common divisor, encode each value as an offset of the smallest value
Strings are de-duplicated, sorted into a table and assigned an id. Then those ids are used as numeric doc values. Once they are numeric they benefit of all the compression schemes desbribed above.
For aggregation, you can use multi-fields: you index it as an analized string and as a raw non-analized string.
Doc_value is good for strings that are not too big. An analyzed string can be a pdf with thousand of words. That's why doc values don't handle analyzed string. Instead ES will use another structure: fielddata which lives in the heap. As a consequence, it is less scalable and has several edge cases.
You can use custom routing:
- when creating your index, you mark
_routing
as required - for each document you specify the value for routing
- You specify the routing value at query time
Doing so, you limit the number of shard called and you speed up the search process. However, be careful in not creating hot spots!
Example from datadog article:
curl -XPUT "localhost:9200/blog_index" -d '
{
"mappings": {
"blogger": {
"_routing": {
"required": true
}
}
}
}'
curl -XPUT "localhost:9200/blog_index/blogger/1?routing=blogger1" -d '
{
"comment": "blogger1 made this cool comment"
}'
curl -XGET "localhost:9200/blog_index/_search?routing=blogger1" -d '
{
"query": {
"match": {
"comment": {
"query": "cool comment"
}
}
}
}'
1 primary shard per node is a good idea, so you spread the load of indexing evenly. Less than 1 primary shard per node will create some hotspot as nodes without primary shard will be less stressed as nodes with primary shard will handle more index requests. Be careful in not putting too many shard per node as it is going to stress nodes.
- https://www.elastic.co/blog/a-heap-of-trouble
- http://docs.oracle.com/javase/7/docs/technotes/guides/vm/performance-enhancements-7.html#zeroBasedCompressedOop
- https://wiki.openjdk.java.net/display/HotSpot/CompressedOops
- https://en.wikipedia.org/wiki/Data_structure_alignment
- https://blog.codecentric.de/en/2014/02/35gb-heap-less-32gb-java-jvm-memory-oddities/
- Doc values ES doc