1 shard corresponds to 1 Spark partition.
Reading from ES: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/arch.html#arch-reading . Beware of increasing the number of shards on ES for performance reasons:
A common concern (read optimization) for improving performance is to increase the number of shards and thus increase the number of tasks on the Hadoop side. Unless such gains are demonstrated through benchmarks, we recommend against such a measure since in most cases, an Elasticsearch shard can easily handle data streaming to a Hadoop or Spark task.
Writing from ES: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/arch.html#arch-writing . Write performance can be increased by having more partitions:
elasticsearch-hadoop detects the number of (primary) shards where the write will occur and distributes the writes between these. The more splits/partitions available, the more mappers/reducers can write data in parallel to Elasticsearch.
Note that nothing is said about ES handling these number of writes automatically. Production testing will determine what an ES cluster can handle in terms of write load.
Settings here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#configuration-serialization
In particular, es.batch.write.retry.count
says that if a bulk write is retried and fails, the whole Hadoop/Spark job fails. The default number of retries is 3. es.batch.size.entries
and es.batch.size.bytes
are 2 settings that can be increased to make writes more poerformant.
Disable speculative execution, especially if we run into duplicate data:
speculative execution is an optimization, enabled by default, that allows Hadoop to create duplicates tasks of those which it considers hanged or slowed down. When doing data crunching or reading resources, having duplicate tasks is harmless and means at most a waste of computation resources; however when writing data to an external store, this can cause data corruption through duplicates or unnecessary updates.
For Spark, disable it through spark.speculation=false
. This is the default.
The es-hadoop library has native Spark integration for writing RDDs and DFs but the API in only available for Scala and Java: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html .
Here is the main doc for writing using the Hadoop API: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html#_emphasis_new_emphasis_literal_org_apache_hadoop_mapreduce_literal_api
. The format expected is roughly (None, {k:v})
:
EsOutputFormat expects a Map<Writable, Writable> representing a document value that is converted internally into a JSON document and indexed in Elasticsearch. Hadoop OutputFormat requires implementations to expect a key and a value however, since for Elasticsearch only the document (that is the value) is necessary, EsOutputFormat ignores the key.
Not sure if one should also disable speculative execution when using the Hadoop API directly:
conf.setBoolean("mapred.map.tasks.speculative.execution", false) 
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false)
Summary of this page: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/performance.html Notes:
- read performance:
- the number of results do not influence the performance of the connector nor Elasticsearch itself.
- increasing read performance by adding more shards will most likely be beneficial only if the plan is to scale out, ie add more machines to handle the increased number of shards.
- write performance:
- a bulk request should to not take longer than 1-2s to be successfully processed. If it takes more, decrease the size of the bulk request. If it takes less, consider increasing it slowly
- increasing the numner of retries and wait often mask the problem of the cluster being overloaded, they don't solve it.
- Limit the numner fo tasks writing to ES if your cluster is too small or the number of write tasks from Spark is too large
Index-tuning guide from ES: https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html