Skip to content

Instantly share code, notes, and snippets.

@ebuildy
Created October 23, 2016 10:53
Show Gist options
  • Save ebuildy/aa0aa371a572da25ffe0ff6b60302b70 to your computer and use it in GitHub Desktop.
Save ebuildy/aa0aa371a572da25ffe0ff6b60302b70 to your computer and use it in GitHub Desktop.
<?php
/**
* A simple Php script to remove duplicates content from elasticsearch (> 2.0)
* Usage:
* php remove_dup.php HOST INDEX(/TYPE) QUERY_STRING FIELD REFRESH_TIMEOUT
*
* This script uses terms aggregation > top_hits to retrieve duplicates then bulk delete document by id.
*
* <!> Very long because we have to wait refresh_interval seconds before new search <!>
*/
$argHost = $argv[1];
$argIndex = $argv[2];
$argQuery = $argv[3];
$argField = $argv[4];
$argPauseTime = $argv[5];
$duplicates = [];
$esQueryQuery = [
'query' => [
'query_string' => [
'query' => $argQuery
]
]
];
$esFindDuplicatesAgg = [
'dup' => [
'terms' => [
'field' => $argField,
'min_doc_count' => 2
],
'aggs' => [
'doc' => [
'top_hits' => [
'fields' => [],
'size' => 1000
]
]
]
]
];
while(true)
{
$esQuery = [
'size' => 0,
'query' => $esQueryQuery,
'aggs' => $esFindDuplicatesAgg
];
$response = queryES('/_search?filter_path=aggregations.dup.buckets.doc.hits.hits._id,aggregations.dup.sum_other_doc_count', $esQuery);
if (!isset($response->aggregations->dup->buckets))
{
echo sprintf('We are done!%s', PHP_EOL);
exit(1);
}
$buckets = $response->aggregations->dup->buckets;
echo sprintf('- Fetch %d buckets - Other %d buckets %s', count($buckets), $response->aggregations->dup->sum_other_doc_count, PHP_EOL);
foreach($buckets as $bucket)
{
$hits = $bucket->doc->hits->hits;
if (count($hits) < 2)
{
continue;
}
// Remove first element, we don't want to delete it
array_shift($hits);
foreach($hits as $hit)
{
array_push($duplicates, $hit->_id);
}
echo sprintf('%s Removing %d documents %s', "\t", count($duplicates), PHP_EOL);
removeDuplicates($duplicates);
$duplicates = [];
}
}
/**
* Build bulk query
*/
function removeDuplicates($duplicates)
{
global $argPauseTime;
$bulkData = '';
foreach($duplicates as $duplicate)
{
$bulkData .= json_encode(['delete' => [ '_id' => $duplicate]]) . PHP_EOL;
}
$r = queryES('_bulk', $bulkData);
// Wait refresh_interval + 1 seconds
sleep($argPauseTime + 1);
}
/**
* Simple wrapper to query elasticsearch via Php curl
*/
function queryES($service, $data)
{
global $argHost, $argIndex;
$ch = curl_init(sprintf("http://%s:9200/%s/%s", $argHost, $argIndex, $service));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, is_string($data) ? $data : json_encode($data));
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
$rawResponse = curl_exec($ch);
$response = json_decode($rawResponse);
curl_close($ch);
return $response;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment