Created
November 10, 2015 22:43
-
-
Save stickfigure/1a345f9a8a56897da6a0 to your computer and use it in GitHub Desktop.
Scatter mapping using Objectify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.foo.housekeeping; | |
import com.google.appengine.api.datastore.Entity; | |
import com.google.common.collect.Range; | |
import com.googlecode.objectify.Key; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import static com.googlecode.objectify.ObjectifyService.ofy; | |
/** | |
* See: | |
* https://github.com/GoogleCloudPlatform/appengine-mapreduce/wiki/ScatterPropertyImplementation | |
* https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/java/src/main/java/com/google/appengine/tools/mapreduce/inputs/DatastoreShardStrategy.java | |
*/ | |
public class Scatter { | |
public static final int DEFAULT_COUNT = 500; | |
/** | |
* @return a set of key ranges that represent count chunks of | |
*/ | |
public static <T> List<Range<Key<T>>> splits(final Class<T> type) { | |
return splits(type, DEFAULT_COUNT); | |
} | |
/** | |
* @return a set of key ranges that represent count chunks of | |
*/ | |
public static <T> List<Range<Key<T>>> splits(final Class<T> type, final int count) { | |
final List<Key<T>> scatteredKeys = ofy().load().type(type).order(Entity.SCATTER_RESERVED_PROPERTY).limit(count - 1).keys().list(); | |
if (scatteredKeys.isEmpty()) | |
return Collections.emptyList(); | |
Collections.sort(scatteredKeys); | |
final List<Range<Key<T>>> points = new ArrayList<>(count); | |
Key<T> last = null; | |
for (final Key<T> here : scatteredKeys) { | |
final Range<Key<T>> range = last == null ? Range.atMost(here) : Range.closedOpen(last, here); | |
points.add(range); | |
last = here; | |
} | |
points.add(Range.atLeast(last)); | |
return points; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.foo.housekeeping; | |
import com.google.appengine.api.taskqueue.DeferredTask; | |
import com.google.common.base.Function; | |
import com.google.common.collect.Iterables; | |
import com.google.common.collect.Range; | |
import com.googlecode.objectify.Key; | |
import com.googlecode.objectify.cmd.Query; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import static com.googlecode.objectify.ObjectifyService.ofy; | |
/** | |
* Takes a key range and creates task instances for each. | |
* | |
* We put this all on a configurable queue | |
*/ | |
@Slf4j | |
@RequiredArgsConstructor | |
abstract public class ScatterMapperShardTask<E, T extends DeferredTask> implements DeferredTask { | |
private static final long serialVersionUID = 1L; | |
protected final Class<E> type; | |
protected final Range<Key<E>> range; | |
/** */ | |
@Override | |
public final void run() { | |
log.info("Mapping {}", range); | |
Query<E> query = ofy().load().type(type); | |
if (range.hasLowerBound()) | |
query = query.filterKey(">=", range.lowerEndpoint()); | |
if (range.hasUpperBound()) | |
query = query.filterKey("<", range.upperEndpoint()); | |
final Iterable<Key<E>> keys = query.keys().iterable(); | |
queue().add(Iterables.transform(keys, new Function<Key<E>, T>() { | |
@Override | |
public T apply(final Key<E> input) { | |
return makeTask(input); | |
} | |
})); | |
log.info("Finished mapping"); | |
} | |
abstract protected T makeTask(Key<E> key); | |
abstract protected QueueHelper queue(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment