Skip to content

Instantly share code, notes, and snippets.

@gianm
Created August 19, 2015 00:55
Show Gist options
  • Save gianm/71ebd291b5286d9d8689 to your computer and use it in GitHub Desktop.
Save gianm/71ebd291b5286d9d8689 to your computer and use it in GitHub Desktop.
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.google.api.client.repackaged.com.google.common.base.Joiner;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.File;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumberTokyoDrift implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberTokyoDrift.class);
private final Appenderator appenderator;
private final DataSchema schema;
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
private final FilteredServerView serverView;
private final SegmentPublisher segmentPublisher;
private final Object handoffMonitor = new Object();
private volatile ExecutorService serverViewExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
private volatile CommitterMaker lastCommitterMaker = null;
private volatile boolean cleanShutdown = true;
private volatile boolean shuttingDown = false;
private volatile boolean stopped = false;
public RealtimePlumberTokyoDrift(
final Appenderator appenderator,
final DataSchema schema,
final RealtimeTuningConfig config,
final FilteredServerView serverView,
final SegmentPublisher segmentPublisher
)
{
this.appenderator = appenderator;
this.schema = schema;
this.config = config.withBasePersistDirectory(
makeBasePersistSubdirectory(
config.getBasePersistDirectory(),
schema.getDataSource(),
config.getShardSpec()
)
);
this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod());
this.serverView = serverView;
this.segmentPublisher = segmentPublisher;
}
private static File makeBasePersistSubdirectory(
final File basePersistDirectory,
final String dataSource,
final ShardSpec shardSpec
)
{
final File dataSourceDirectory = new File(basePersistDirectory, dataSource);
return new File(dataSourceDirectory, String.valueOf(shardSpec.getPartitionNum()));
}
@Override
public void startJob()
{
appenderator.startJob();
initializeExecutors();
startPersistThread();
registerServerViewCallback();
mergeAndPush();
}
@Override
public int add(
final InputRow row,
final CommitterMaker committerMaker
) throws IndexSizeExceededException
{
if (!rejectionPolicy.accept(row.getTimestampFromEpoch())) {
return -1;
}
final Interval segmentInterval = schema.getGranularitySpec()
.getSegmentGranularity()
.bucket(row.getTimestamp());
final SegmentIdentifier segmentIdentifier = new SegmentIdentifier(
appenderator.getDataSource(),
segmentInterval,
config.getVersioningPolicy().getVersion(segmentInterval),
config.getShardSpec()
);
try {
final int numRows = appenderator.add(segmentIdentifier, row, committerMaker);
lastCommitterMaker = committerMaker;
return numRows;
}
catch (SegmentNotWritableException e) {
// Segment already started handoff
return -1;
}
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
return appenderator.getQueryRunner(query);
}
@Override
public void persist(Committer committer)
{
appenderator.persistAll(committer);
}
@Override
public void finishJob()
{
log.info("Shutting down...");
shuttingDown = true;
List<SegmentIdentifier> pending = appenderator.getSegments();
if (pending.isEmpty()) {
log.info("No segments to hand off.");
} else {
log.info("Pushing segments: %s", Joiner.on(", ").join(pending));
}
try {
if (lastCommitterMaker != null) {
// Push all remaining data
mergeAndPush();
}
synchronized (handoffMonitor) {
while (!pending.isEmpty()) {
log.info("Waiting to hand off: %s", Joiner.on(", ").join(pending));
handoffMonitor.wait();
pending = appenderator.getSegments();
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
stopped = true;
shutdownExecutors();
appenderator.close();
}
if (!cleanShutdown) {
throw new ISE("Exception occurred during persist and merge.");
}
}
private void initializeExecutors()
{
if (scheduledExecutor == null) {
scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
}
if (serverViewExecutor == null) {
serverViewExecutor = Execs.singleThreaded("plumber_serverview_%d");
}
}
private void shutdownExecutors()
{
serverViewExecutor.shutdownNow();
scheduledExecutor.shutdownNow();
}
private void startPersistThread()
{
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final Period windowPeriod = config.getWindowPeriod();
final DateTime truncatedNow = segmentGranularity.truncate(new DateTime());
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(
System.currentTimeMillis(),
segmentGranularity.increment(truncatedNow).getMillis() + windowMillis
)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
scheduledExecutor,
new Duration(
System.currentTimeMillis(),
segmentGranularity.increment(truncatedNow).getMillis() + windowMillis
),
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(
"%s-overseer-%d",
schema.getDataSource(),
config.getShardSpec().getPartitionNum()
)
)
{
@Override
public ScheduledExecutors.Signal doCall()
{
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
}
mergeAndPush();
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
private void mergeAndPush()
{
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final Period windowPeriod = config.getWindowPeriod();
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info("Starting merge and push.");
DateTime minTimestampAsDate = segmentGranularity.truncate(
new DateTime(
Math.max(
windowMillis,
rejectionPolicy.getCurrMaxTime()
.getMillis()
)
- windowMillis
)
);
long minTimestamp = minTimestampAsDate.getMillis();
final List<SegmentIdentifier> segments = appenderator.getSegments();
final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
if (shuttingDown) {
log.info(
"Found [%,d] segments. Attempting to hand off all of them.",
segments.size()
);
segmentsToPush.addAll(segments);
} else {
log.info(
"Found [%,d] segments. Attempting to hand off segments that start before [%s].",
segments.size(),
minTimestampAsDate
);
for (SegmentIdentifier segment : segments) {
final Long intervalStart = segment.getInterval().getStartMillis();
if (intervalStart < minTimestamp) {
log.info("Adding entry [%s] for merge and push.", segment);
segmentsToPush.add(segment);
} else {
log.info(
"Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.",
segment,
new DateTime(intervalStart),
minTimestampAsDate
);
}
}
}
log.info("Found [%,d] sinks to persist and merge", segmentsToPush.size());
// WARNING: CommitterMakers.nil() here means that on-disk data can get out of sync with committing
ListenableFuture<?> publishFuture = Futures.transform(
appenderator.push(segmentsToPush, CommitterMakers.nil().makeCommitter()),
new Function<SegmentsAndMetadata, Object>()
{
@Override
public Object apply(SegmentsAndMetadata pushedSegmentsAndMetadata)
{
// Immediately publish after pushing
for (DataSegment pushedSegment : pushedSegmentsAndMetadata.getSegments()) {
try {
segmentPublisher.publishSegment(pushedSegment);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return null;
}
}
);
Futures.addCallback(
publishFuture,
new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
log.info("Published [%,d] sinks.", segmentsToPush.size());
}
@Override
public void onFailure(Throwable e)
{
final List<String> segmentIdentifierStrings = Lists.transform(
segmentsToPush,
new Function<SegmentIdentifier, String>()
{
@Override
public String apply(SegmentIdentifier input)
{
return input.getIdentifierAsString();
}
}
);
log.makeAlert(e, "Failed to publish merged indexes[%s]", schema.getDataSource())
.addData("segments", segmentIdentifierStrings)
.emit();
if (shuttingDown) {
// We're trying to shut down, and these segments failed to push. Let's just get rid of them.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
cleanShutdown = false;
for (SegmentIdentifier identifier : segmentsToPush) {
dropSegment(identifier);
}
}
}
}
);
}
private void registerServerViewCallback()
{
serverView.registerSegmentCallback(
serverViewExecutor,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
if (stopped) {
log.info("Unregistering ServerViewCallback");
return ServerView.CallbackAction.UNREGISTER;
}
if (!server.isAssignable()) {
return ServerView.CallbackAction.CONTINUE;
}
log.debug("Checking segment[%s] on server[%s]", segment, server);
if (schema.getDataSource().equals(segment.getDataSource())
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
) {
final Interval interval = segment.getInterval();
for (SegmentIdentifier identifier : appenderator.getSegments()) {
final long sinkStart = identifier.getInterval().getStartMillis();
if (interval.contains(sinkStart)) {
log.info("Segment[%s] matches sink[%s] on server[%s]", segment, identifier, server);
final String segmentVersion = segment.getVersion();
final String sinkVersion = identifier.getVersion();
if (segmentVersion.compareTo(sinkVersion) >= 0) {
log.info("Segment version[%s] >= sink version[%s]. Dropping sink.", segmentVersion, sinkVersion);
dropSegment(identifier);
}
}
}
}
return ServerView.CallbackAction.CONTINUE;
}
},
new Predicate<DataSegment>()
{
@Override
public boolean apply(final DataSegment segment)
{
return
schema.getDataSource().equalsIgnoreCase(segment.getDataSource())
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
appenderator.getSegments(), new Predicate<SegmentIdentifier>()
{
@Override
public boolean apply(SegmentIdentifier identifier)
{
return segment.getInterval().contains(identifier.getInterval().getStartMillis());
}
}
);
}
}
);
}
private void dropSegment(final SegmentIdentifier identifier)
{
log.info("Dropping segment: %s", identifier);
Futures.addCallback(
appenderator.drop(identifier),
new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
synchronized (handoffMonitor) {
handoffMonitor.notifyAll();
}
}
@Override
public void onFailure(Throwable e)
{
log.warn(e, "Failed to drop segment: %s", identifier);
synchronized (handoffMonitor) {
handoffMonitor.notifyAll();
}
}
}
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment