Created
May 2, 2012 18:21
-
-
Save jprante/2578923 to your computer and use it in GitHub Desktop.
Performing multithreaded asynchronous bulk writes to Elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to Jörg Prante and xbib under one or more contributor | |
* license agreements. See the NOTICE.txt file distributed with this work | |
* for additional information regarding copyright ownership. | |
* | |
* Copyright (C) 2012 Jörg Prante and xbib | |
* | |
* This program is free software; you can redistribute it and/or modify | |
* it under the terms of the GNU Affero General Public License as published | |
* by the Free Software Foundation; either version 3 of the License, or | |
* (at your option) any later version. | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU Affero General Public License for more details. | |
* | |
* You should have received a copy of the GNU Affero General Public License | |
* along with this program; if not, see http://www.gnu.org/licenses | |
* or write to the Free Software Foundation, Inc., 51 Franklin Street, | |
* Fifth Floor, Boston, MA 02110-1301 USA. | |
* | |
* The interactive user interfaces in modified source and object code | |
* versions of this program must display Appropriate Legal Notices, | |
* as required under Section 5 of the GNU Affero General Public License. | |
* | |
* In accordance with Section 7(b) of the GNU Affero General Public | |
* License, these Appropriate Legal Notices must retain the display of the | |
* "Powered by xbib" logo. If the display of the logo is not reasonably | |
* feasible for technical reasons, the Appropriate Legal Notices must display | |
* the words "Powered by xbib". | |
*/ | |
package org.xbib.elasticsearch; | |
import java.io.IOException; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import org.elasticsearch.action.ActionListener; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; | |
import org.xbib.io.Identifiable; | |
import org.xbib.io.Session; | |
import org.xbib.rdf.Resource; | |
/** | |
* Write bulk data to Elasticsearch | |
* | |
* @author <a href="mailto:[email protected]">Jörg Prante</a> | |
*/ | |
public class BulkWrite extends AbstractWrite { | |
/** the logger */ | |
private static final Logger logger = Logger.getLogger(BulkWrite.class.getName()); | |
private int bulkSize = 100; | |
private int maxActiveRequests = 30; | |
private long millisBeforeContinue = 60000L; | |
private int totalTimeouts; | |
private static final int MAX_TOTAL_TIMEOUTS = 10; | |
private static final AtomicInteger onGoingBulks = new AtomicInteger(0); | |
private static final AtomicInteger counter = new AtomicInteger(0); | |
private BulkRequestBuilder currentBulk; | |
public BulkWrite(String index, String type) { | |
super(index, type, ':'); | |
this.totalTimeouts = 0; | |
} | |
public BulkWrite setBulkSize(int bulkSize) { | |
this.bulkSize = bulkSize; | |
return this; | |
} | |
public BulkWrite setMaxActiveRequests(int maxActiveRequests) { | |
this.maxActiveRequests = maxActiveRequests; | |
return this; | |
} | |
public BulkWrite setMillisBeforeContinue(long millis) { | |
this.millisBeforeContinue = millis; | |
return this; | |
} | |
/** | |
* Write resource to Elasticsearch in bulk mode | |
* | |
* @param session the session | |
* @param resource the resource | |
* @throws IOException | |
*/ | |
@Override | |
public void write(ElasticsearchSession session, Resource resource) throws IOException { | |
if (!session.isOpen()) { | |
throw new IOException("session not open"); | |
} | |
Client client = session.getClient(); | |
if (currentBulk == null) { | |
currentBulk = client.prepareBulk(); | |
} | |
build(resource); | |
if (resource.isDeleted()) { | |
currentBulk.add(Requests.deleteRequest(index).type(type).id(resource.getIdentifier().getFragment())); | |
} else { | |
currentBulk.add(Requests.indexRequest(index).type(type).id(resource.getIdentifier().getFragment()).create(false).source(getBuilder())); | |
} | |
if (currentBulk.numberOfActions() >= bulkSize) { | |
processBulk(client); | |
} | |
} | |
@Override | |
public void flush(ElasticsearchSession session) throws IOException { | |
if (currentBulk == null) { | |
return; | |
} | |
if (totalTimeouts > MAX_TOTAL_TIMEOUTS) { | |
// waiting some minutes is much too long, do not wait any longer | |
throw new IOException("total flush() timeouts exceeded limit of + " + MAX_TOTAL_TIMEOUTS + ", aborting"); | |
} | |
if (currentBulk.numberOfActions() > 0) { | |
processBulk(session.getClient()); | |
} | |
// wait for all outstanding bulk requests | |
while (onGoingBulks.intValue() > 0) { | |
logger.log(Level.INFO, "waiting for {0} active bulk requests", onGoingBulks); | |
synchronized (onGoingBulks) { | |
try { | |
onGoingBulks.wait(millisBeforeContinue); | |
} catch (InterruptedException e) { | |
logger.log(Level.WARNING, "timeout while waiting, continuing after {0} ms", millisBeforeContinue); | |
totalTimeouts++; | |
} | |
} | |
} | |
} | |
private void processBulk(Client client) { | |
while (onGoingBulks.intValue() >= maxActiveRequests) { | |
logger.log(Level.INFO, "waiting for {0} active bulk requests", onGoingBulks); | |
synchronized (onGoingBulks) { | |
try { | |
onGoingBulks.wait(millisBeforeContinue); | |
} catch (InterruptedException e) { | |
logger.log(Level.WARNING, "timeout while waiting, continuing after {0} ms", millisBeforeContinue); | |
totalTimeouts++; | |
} | |
} | |
} | |
int currentOnGoingBulks = onGoingBulks.incrementAndGet(); | |
final int numberOfActions = currentBulk.numberOfActions(); | |
logger.log(Level.INFO, "submitting new bulk index request ({0} docs, {1} requests currently active)", new Object[]{numberOfActions, currentOnGoingBulks}); | |
try { | |
currentBulk.execute(new ActionListener<BulkResponse>() { | |
@Override | |
public void onResponse(BulkResponse bulkResponse) { | |
if (bulkResponse.hasFailures()) { | |
logger.log(Level.SEVERE, "bulk index has failures: {0}", bulkResponse.buildFailureMessage()); | |
} else { | |
final int totalActions = counter.addAndGet(numberOfActions); | |
logger.log(Level.INFO, "bulk index success ({0} millis, {1} docs, total of {2} docs)", new Object[]{bulkResponse.tookInMillis(), numberOfActions, totalActions}); | |
} | |
onGoingBulks.decrementAndGet(); | |
synchronized (onGoingBulks) { | |
onGoingBulks.notifyAll(); | |
} | |
} | |
@Override | |
public void onFailure(Throwable e) { | |
logger.log(Level.SEVERE, "bulk request failed", e); | |
} | |
}); | |
} catch (Exception e) { | |
logger.log(Level.SEVERE, "unhandled exception, failed to execute bulk request", e); | |
} finally { | |
currentBulk = client.prepareBulk(); | |
} | |
} | |
@Override | |
public void prepareExecution(Session session) throws IOException { | |
throw new UnsupportedOperationException("Not supported yet."); | |
} | |
@Override | |
public void create(ElasticsearchSession session, Identifiable identifiable, Resource resource) throws IOException { | |
write(session, resource); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment