Skip to content

Instantly share code, notes, and snippets.

Created November 3, 2017 18:09
Show Gist options
  • Save anonymous/5db9369f2a118efb51a9514911f080f3 to your computer and use it in GitHub Desktop.
Save anonymous/5db9369f2a118efb51a9514911f080f3 to your computer and use it in GitHub Desktop.
package app
import io.elasticsearch.DocumentIndex
import io.mongodb.DocumentCollection
import org.bson.Document
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.metadata.AliasMetaData
import org.elasticsearch.common.collect.ImmutableOpenMap
import org.elasticsearch.index.query.QueryBuilder
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.reindex.UpdateByQueryAction
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder
import org.elasticsearch.script.Script
import org.elasticsearch.script.ScriptService
import util.PayoutFailsafe
/**
* Created by [email protected] on 11/3/17.
*/
class UpdateClicks {
static void main(String[] args) {
def guaranteeOffers = getOfferGuarantee()
def inhousePublishers = getPublisherInhouse()
DocumentIndex clickIndex = new DocumentIndex('es://localhost:19300/clicks')
def clickIndices = getIndicesFromAlias('clicks', clickIndex.delegate).sort()
clickIndices.each { clickIndexName ->
def query = """
SELECT * FROM $clickIndexName
GROUP BY offer_id, publisher_id
LIMIT 10000
"""
def offerPublishers = clickIndex.prepareQueryDocument(query).get().rows
offerPublishers.each { offerPublisher ->
def offerId = offerPublisher.offer_id as String
def publisherId = offerPublisher.publisher_id as String
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery('offer_id', offerId))
.must(QueryBuilders.matchQuery('publisher_id', publisherId))
def script = """
ctx._source.offer_guarantee = offer_guarantee;ctx._source.inhouse = inhouse;
"""
def params = [
offer_guarantee: offerId in guaranteeOffers,
inhouse : publisherId in inhousePublishers
]
def result = PayoutFailsafe.instance.get {
UpdateByQueryRequestBuilder updateRequestBuilder = UpdateByQueryAction.INSTANCE.newRequestBuilder(clickIndex.delegate)
updateRequestBuilder.source(clickIndexName)
.filter(queryBuilder)
.script(new Script(script, ScriptService.ScriptType.INLINE, null, params))
.get()
}
if (result.indexingFailures){
result.indexingFailures.each {println it.toString()}
return null
}
if (result.searchFailures){
result.searchFailures.each {println it.toString()}
return null
}
println "Updated index $clickIndexName with pub_id = $publisherId and offer_id = $offerId"
}
sleep(60000)
}
}
static List<String> getIndicesFromAlias(String alias, Client client) {
ImmutableOpenMap<String, List<AliasMetaData>> aliases = ((GetAliasesResponse) client.admin().indices().getAliases(new GetAliasesRequest(alias)).actionGet()).getAliases()
ArrayList<String> allIndices = new ArrayList()
aliases.keysIt().forEachRemaining { index -> allIndices.add(index) }
return allIndices
}
static Set<String> getOfferGuarantee() {
DocumentCollection offerCollection = new DocumentCollection('mongodb://adflexmeta_ro:a45leXmRtarW@localhost:57017/adflex_meta.offers')
def filter = [guarantee: true] as Document
def projection = [id: 1] as Document
offerCollection.find(filter).projection(projection).intoDocuments().thenApply { offers ->
return (offers*.id as List<String>).toSet()
}.join()
}
static Set<String> getPublisherInhouse() {
DocumentCollection publisherCollection = new DocumentCollection('mongodb://adflexmeta_ro:a45leXmRtarW@localhost:57017/adflex_meta.publishers')
def filter = [inhouse: true] as Document
def projection = [id: 1] as Document
publisherCollection.find(filter).projection(projection).intoDocuments().thenApply { publishers ->
return (publishers*.id as List<String>).toSet()
}.join()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment