Created
May 1, 2014 20:38
-
-
Save MattFaus/b3fa52e15813af283b2c to your computer and use it in GitHub Desktop.
Experimental code demonstrating arbitrary mappers and reducers in the mapreduce library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import jinja2 | |
import logging | |
import os | |
import request_handler | |
import third_party.mapreduce | |
import third_party.mapreduce.input_readers | |
import third_party.mapreduce.output_writers | |
import third_party.mapreduce.lib.files | |
import third_party.mapreduce.operation | |
from google.appengine.ext import db | |
import compat_key | |
import content.models | |
import content.publish | |
import layer_cache | |
import setting_model | |
import user_models | |
import user_util | |
RAW_DATA = [ | |
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, #19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 610, 611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636, 637, 638, 639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649, 650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662, 663, 664, 665, 666, 667, 668, 669, 670, 671, 672, 673, 674, 675, 676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688, 689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, 728, 729, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 740, 741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805, 806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831, 832, 833, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857, 858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909, 910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999, | |
] | |
# TODO(mattfaus): Capture this in a wiki page on https://code.google.com/p/appengine-mapreduce/w/list | |
# Below is a description of how RAW_DATA passes through the mapreduce library and | |
# winds up in a blob file. | |
# 1. StartJobHandler._start_map() calls validate() is called with all of MapperSpec | |
# on both the input_reader and the output_writer | |
# MapperSpec(advanced_mapreduce.data_processor, advanced_mapreduce.CustomMapper, {'input_reader': {'foo': 'bar'}, 'processing_rate': 128, 'output_writer': {'output_sharding': 'none', 'filesystem': 'blobstore'}}, 32) | |
# 1a. StartJobHandler._start_map() verifies that the handler_spec returns a proper handler | |
# 1b. StartJobHandler._start_map() creates a new model.MapreduceState entity | |
# 1c. "" adds a call to kickoffjob_callback to the queue_name | |
# 2. KickOffJobHandler.handle() calls InputReader.split_input() | |
# Note: It's OK if there are no InputReaders returned by split_input() | |
# split_input got MapperSpec(advanced_mapreduce.data_processor, advanced_mapreduce.CustomMapper, {u'input_reader': {u'foo': u'bar'}, u'processing_rate': 128, u'output_writer': {u'output_sharding': u'none', u'filesystem': u'blobstore'}}, 32) | |
# split_input got params {'foo': u'bar'} | |
# Which, in turn, calls InputReader.__init__() for each of the input shards | |
# 2a. "" calls OutputWriter.init_job() | |
# 2b. "" for each InputReader returned by split_input(), call: | |
# OutputWriter.create(), add this to the list of OutputWriter's | |
# NOTE: The number of InputReaders returned by split_input() determines the number of shards | |
# 3. to_json() is called on all of the CustomMapper instances that split_input returned, | |
# presumably to distribute across the cluster | |
# 4. from_json() is called on all the shards, presumably to resume that shard's work | |
# 4b. CustomMapper.__init__ is called (from within from_json()) | |
# 5. __iter__ is called on all of the shards, not all of them have to do work | |
# Starting processing of 0 items from 1 | |
# Starting processing of 0 items from 0 | |
# 6. data_processor (the handler) is invoked with the output of __iter__ | |
# Handler invoked with data 17 | |
# 6a. InputReader.__iter__ generates the work items | |
# 6b. MapperWorkerCallbackHandler.process_data() streams: | |
# - work items from InputReader.__iter__ into handler_spec | |
# - results from handler_spec into OutputWriter.write() | |
# 7. As the shards complete their work, the POST to /mapreduce/worker_callback | |
# Which the mapreduce library handles in third_party.mapreduce.handlers.MapperWorkerCallbackHandler | |
# server.py:593] default: "POST /mapreduce/worker_callback HTTP/1.1" 200 - | |
# 8. When all shards have completed, there is a post to POST /mapreduce/controller_callback | |
# Which the mapreduce library handles in third_party.mapreduce.handlers.ControllerCallbackHandler | |
# INFO 2013-07-25 17:31:20,680 server.py:593] default: "POST /mapreduce/worker_callback HTTP/1.1" 200 - | |
# INFO 2013-07-25 17:31:21,062 handlers.py:449] Final result for job '1580509570902BA1D5DAA' is 'success' | |
# 8a. CAlls ControllerCallbackHandler._finalize_job(), which calls output_writer_class().finalize_job(mapreduce_state) | |
# 8b. FileOutputWriterBase.finalize_job(), closes all the files, and returns the blob_keys | |
# 9. I modified the mapreduce status UI to output the final blob key | |
# 10. Download that blob, and viola, you've got RAW_DATA' ! | |
# 0'1'2'3'4'5'6'7'8'9'10'11'12'13'14'15'16'17'18' | |
############ | |
# Or, using the CustomMultiFileOutputWriter: | |
# 8. When all shards have completed, there is a post to POST /mapreduce/controller_callback | |
# Which the mapreduce library handles in third_party.mapreduce.handlers.ControllerCallbackHandler | |
# 8a. CAlls CustomMultiFileOutputWriter._finalize_job(), | |
# 8b. CustomMultiFileOutputWriter.finalize_job(), closes all the files, and returns the blob_keys | |
# 9. I modified the mapreduce status UI to output the final blob keys | |
# 10. You have two files which contain, respectively: | |
# 0'2'4'6'8'10'12'14'16'18' | |
# 1'3'5'7'9'11'13'15'17' | |
############ | |
# And, for parallelized reduce: | |
# 6b. CustomParallelReduce: | |
# Coallate data written into smaller chunks | |
# At job finalization, launch another MapReduce, which | |
# Splits intermediate data | |
# Finally, writes into blob files with CustomMultiFileOutputWriter | |
class StartAdvancedMapreduce(request_handler.RequestHandler): | |
@user_util.manual_access_checking # Shhhh | |
def get(self): | |
# Kick off the mapreduce | |
mapreduce_id = third_party.mapreduce.control.start_map( | |
name="AdvancedMapreduce", | |
# Responsible for splitting input based on input_reader params | |
reader_spec=( | |
"advanced_mapreduce.CustomMapper"), | |
# Responsible for processing input from InputReader, outputing data to OutputWriter | |
handler_spec='advanced_mapreduce.data_processor', | |
# Responsible for writing output | |
# Single-file writing | |
# output_writer_spec=( | |
# 'third_party.mapreduce.output_writers.FileOutputWriter'), | |
# Multi-file writing (custom) | |
output_writer_spec=( | |
'advanced_mapreduce.CustomMultiFileOutputWriter'), | |
mapper_parameters={ | |
"input_reader": { | |
"foo": 'bar' | |
}, | |
# Required by FileOutputWriterBase | |
"output_writer": { | |
# "output_sharding": "input", # Only needed for FileOutputWriter, | |
# default is 'none', which is what we want usually | |
"filesystem": 'blobstore', | |
# OR | |
# "filesystem": 'gs', | |
# "gs_bucket_name": 'bobthebucket', | |
# "gs_acl": "project-private", # Optional, project-private is default | |
}, | |
"processing_rate": 128, | |
}, | |
mapreduce_parameters={ | |
# "done_callback": "/devadmin/content/backfill_done" | |
}, | |
shard_count=32, | |
queue_name="backfill-mapreduce-queue") | |
logging.error('Starting mapreduce job %s' % mapreduce_id) | |
class CustomMapper(third_party.mapreduce.input_readers.InputReader): | |
def __init__(self, start, step, current=0): | |
self._start = start | |
self._step = step | |
# Index in the resulting filtered list we are currently traversing | |
self._current = current | |
logging.error('CustomMapper init with %i, %i, %i' % (start, step, current)) | |
def __iter__(self): | |
my_data = RAW_DATA[self._start::self._step] | |
logging.error( | |
'Starting processing of %i items from %i' % (self._current, len(my_data))) | |
for data in my_data[self._current:]: | |
self._current += 1 | |
# TODO(mattfaus): What if we are interrupted right here? | |
yield data | |
@classmethod | |
def split_input(cls, mapper_spec): | |
"""Divide mapping work evenly among a set of input readers. | |
This function is called once to create a set of | |
ContentRevisionsInputReader instances that will each traverse its own | |
distinct set of revisions. Each reader starts at a different offset | |
and steps forward by N, where N is the number of readers (shards). | |
""" | |
params = third_party.mapreduce.input_readers._get_params(mapper_spec) | |
logging.error('split_input got %s' % mapper_spec) | |
logging.error('split_input got params %s' % params) | |
shard_count = mapper_spec.shard_count | |
mr_input_readers = [cls(idx, shard_count) | |
for idx in xrange(0, shard_count)] | |
return mr_input_readers | |
@classmethod | |
def validate(cls, mapper_spec): | |
logging.error('validate() got %s' % mapper_spec) | |
@classmethod | |
def from_json(cls, json): | |
logging.error('deserializing') | |
return cls(json["start"], json["step"], json["current"]) | |
def to_json(self): | |
logging.error('serializing') | |
return {"start": self._start, "step": self._step, | |
"current": self._current} | |
# TODO(mattfaus): Play with input_reader.expand_parameters in process_data() | |
class CustomMultiFileOutputWriter(third_party.mapreduce.output_writers.FileOutputWriter): | |
"""Creates multiple files | |
""" | |
@classmethod | |
def _get_all_filenames(cls, mapreduce_state_or_ctx): | |
"""Generates filenames for this mapreduce job. | |
The generator pattern is used for fun, it could just return an array. | |
Arguments: | |
mapreduce_state_or_ctx - may either be a model.MapreduceState | |
or a context.Context object. Thankfully, the members we | |
are interested in have the same name in both cases. | |
""" | |
mr_name = mapreduce_state_or_ctx.mapreduce_spec.name | |
mr_id = mapreduce_state_or_ctx.mapreduce_spec.mapreduce_id | |
file_suffixes = ['-even', '-odd'] | |
for suffix in file_suffixes: | |
yield ("%s-%s%s" % (mr_name, mr_id, suffix)) | |
@classmethod | |
def init_job(cls, mapreduce_state): | |
"""Initialize job-level writer state. | |
We must create all files that we will be writing to at the beginning, | |
otherwise we would risk multiple instances of CustomMultiFileOutputWriter | |
running on separate shards concurrently creating the same files. | |
At least, I think that's right. | |
You could have each shard generate it's own set of files, and then | |
stitch them together in finalize_job(), but that might get hairy. | |
Note: Most of this code is copied from FileOutputWriterBase.init_job() | |
Args: | |
mapreduce_state: an instance of model.MapreduceState describing current | |
job. | |
""" | |
logging.error('CustomMultiFileOutputWriter.init_job() called %s' % mapreduce_state) | |
try: | |
# output_sharding = cls._get_output_sharding(mapreduce_state=mapreduce_state) | |
mapper_spec = mapreduce_state.mapreduce_spec.mapper | |
params = third_party.mapreduce.output_writers._get_params(mapper_spec) | |
mime_type = params.get("mime_type", "application/octet-stream") | |
filesystem = cls._get_filesystem(mapper_spec=mapper_spec) | |
# bucket = params.get(cls.GS_BUCKET_NAME_PARAM) | |
acl = params.get(cls.GS_ACL_PARAM, "project-private") | |
filenames = [] | |
request_filenames = [] | |
for filename in cls._get_all_filenames(mapreduce_state): | |
# if bucket is not None: | |
# filename = "%s/%s" % (bucket, filename) | |
request_filenames.append(filename) | |
system_filename = cls._create_file( | |
filesystem, filename, mime_type, acl=acl) | |
logging.error('Created file %s as %s', filename, system_filename) | |
filenames.append(system_filename) | |
mapreduce_state.writer_state = cls._State( | |
filenames, request_filenames).to_json() | |
# Note: FileOutputWriterBase.get_filenames() returns these filenames | |
# so if you do anything different, you may have to override that function | |
except Exception, e: | |
logging.error('CustomMultiFileOutputWriter.init_job() unhandled exception %s' % e) | |
def write(self, data, ctx): | |
"""Write data. | |
Args: | |
data: actual data yielded from handler. Type is writer-specific. | |
ctx: an instance of context.Context. | |
""" | |
try: | |
logging.error('custom writer.write invoked with %s' % data) | |
if ctx.get_pool("file_pool") is None: | |
ctx.register_pool("file_pool", | |
third_party.mapreduce.output_writers._FilePool(ctx=ctx)) | |
all_files = self._filenames | |
# all_files = self._get_all_filenames(ctx) # Or, this | |
# Figure out which file this data belongs in | |
num = int(data[:-1]) | |
# if num % 2 == 0: | |
# my_file = all_files[0] | |
# else: | |
# my_file = all_files[1] | |
my_file = all_files[int(data[:-1]) % 2] # ha! | |
logging.error('Writing %s to %s' % (data, my_file)) | |
ctx.get_pool("file_pool").append(my_file, str(data)) | |
except Exception, e: | |
logging.error('write unhandled exception: %s' % e) | |
@classmethod | |
def from_json(cls, state): | |
logging.error('from_json received state %s', state) | |
return cls(state["filenames"]) | |
def to_json(self): | |
return { "filenames": self._filenames } | |
def __init__(self, filenames): | |
self._filenames = filenames | |
@classmethod | |
def create(cls, mapreduce_state, shard_number): | |
"""Create new writer for a shard. | |
Args: | |
mapreduce_state: an instance of model.MapreduceState describing current | |
job. | |
shard_number: shard number as integer. | |
""" | |
state = cls._State.from_json(mapreduce_state.writer_state) | |
return cls(state.filenames) | |
def data_processor(data): | |
"""This must be a generator for mapreduce to receive the output.""" | |
logging.error('Handler invoked with data %s' % data) | |
yield '%i\'' % data | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO(mattfaus): Rename query_config to job_config | |
# TODO(mattfaus): Rename OutputWriter | |
# TODO(mattfaus): Rename Query* to Map* | |
class DataWriter(object): | |
"""Used by the QueryDrainerPipeline to coalesce intermediate results | |
into their final resting place. | |
""" | |
def write_metadata(self, metadata): | |
raise NotImplemented() | |
def write_result(self, data): | |
"""Writes all data.""" | |
raise NotImplemented() | |
def finalize(self): | |
"""Returns the final result. Maybe a blob_key, maybe something else.""" | |
raise NotImplemented() | |
class CheckpointedDataWriter(DataWriter): | |
class QueryPipeline(pipeline.Pipeline): | |
"""Extend this class to process small subsets of data.""" | |
outputs = ['query_result'] | |
def run(self, *args, **kwargs): | |
"""Runs the query, writes it to query_result.""" | |
try: | |
result = self.query(args, kwargs) | |
self.fill(self.query_result, result) | |
except Exception, e: | |
self.handle_exception() | |
finally: | |
yield common.Return(self.pipeline_id) | |
def handle_exception(self): | |
"""Raises the first 2 exceptions, gracefully fails on the 3rd. Overridable.""" | |
# TODO(mattfaus): Raise for retries | |
self.fill(self.query_result, {'errors_sheet': e}) | |
def query(self, *args, **kwargs): | |
"""Returns a dict that our DataWriter understands.""" | |
raise NotImplemented("Implement in sub-class.") | |
@property | |
def task_name(): | |
"""Override in base classes for easier debugging.""" | |
return 'unknown' | |
class CoordinatedQueryPipeline(pipeline.Pipeline): | |
def get_data_writer(self, query_config): | |
"""Given query_config, gets the DataWriter.""" | |
raise NotImplemented() | |
def get_query_pipelines(self, query_config): | |
"""Given query_config, gets the QueryPipelines. | |
Returns: | |
An array of QueryPipelines | |
""" | |
# Parse query_config to instantiate QueryPipelines, return | |
raise NotImplemented() | |
def get_reduce_pipelines(self, map_results): | |
"""Given map_results, segment the reduce pipelines. | |
Arguments: | |
map_results - a list of pipeline_ids | |
""" | |
def run(self, query_config): | |
map_results = [] | |
for q in self.get_query_pipelines(): | |
map_results.append((yield self.get_query_pipelines())) | |
# map_results is a list of pipeline_id's that contain results | |
reduce_results = | |
num_query_results = yield QueryLauncherPipeline(query_config) | |
yield QueryDrainerPipeline(query_config, num_queued_query_results) | |
# Or, parallelize the reduction (with a recursive DataWriter) | |
# parallelization_factor = 10 | |
# for p in range(parallelization_factor): | |
# yield QueryDrainerPipeline() | |
def run(self, query_config): | |
"""Starts the producer and consumer pipelines.""" | |
num_query_results = yield QueryLauncherPipeline(query_config) | |
yield QueryDrainerPipeline(query_config, num_queued_query_results) | |
# Or, parallelize the reduction (with a recursive DataWriter) | |
# parallelization_factor = 10 | |
# for p in range(parallelization_factor): | |
# yield QueryDrainerPipeline() | |
class QueryLauncherPipeline(CoordinatedQueryPipeline): | |
def run(self, query_config): | |
enqueue_results = [] | |
for query_pipeline in self.get_query_pipelines(query_config): | |
enqueue_results.append(( | |
yield EnqueuePipeline(query_pipeline, tag=self.root_pipeline_id, | |
task_name=query_pipeline.task_name))) | |
yield common.Sum(*enqueue_results) | |
class QueryDrainerPipeline(CoordinatedQueryPipeline): | |
def run(query_config, num_queued): | |
# Must be idempotent, so add checkpointing to DataWriter? | |
data_writer = self.get_data_writer(query_config, self.root_pipeline_id) | |
# TODO(mattfaus): How can we guarantee this takes <10min to run? | |
# Async pipeline, or launch a background thread? | |
dequeued_count = 0 | |
while dequeued_count < num_queued: | |
# Lease task from pull-queue | |
# Task payload is simply a pipeline_id (thanks to QueryPipeline) | |
query_pipeline = QueryPipeline.from_id(task.payload) | |
data_writer.write_result(query_pipeline.query_result) | |
dequeued_count += 1 | |
# mark as processed-by self.pipeline_id (for idempotency) | |
yield common.Return(data_writer.finalize()) | |
# Delete all tasks | |
# Example 1 - A simple segmentation and query example | |
######################################################################## | |
class ExampleQueryPipeline(QueryPipeline): | |
def query(self, example_id): | |
# TODO(mattfaus): Issue datastore queries, fill in dict, return | |
pass | |
class ExampleCoordinatedQueryPipeline(CoordinatedQueryPipeline): | |
def get_data_writer(self, query_config): | |
return CsvWriter() # or a DataStoreWriter(), perhaps? | |
def get_query_pipelines(self, query_config): | |
for example_id in query_config['ids']: | |
yield ExampleQueryPipeline(example_id) | |
def run_my_query(query_config): | |
stage = ExampleCoordinatedQueryPipeline(query_config) | |
stage.run() | |
# Output the result of DataWriter.finalize() | |
print stage.outputs.default.value | |
# Example 2 - A more complex segmentation and query example | |
######################################################################## | |
class NewSriCoordinatedQueryPipeline(CoordinatedQueryPipeline): | |
def get_data_writer(self, query_config): | |
return CsvWriter() | |
def get_query_pipelines(self, query_config): | |
for coach_id in query_config['coach_ids']: | |
yield NewCoachPipeline(coach_id) | |
for student_id in get_students(coach_id): | |
date_segments = get_date_segments(student_id) | |
for start_dt, end_dt in date_segments: | |
yield NewStudentPipeline(student_id, start_dt, end_dt) | |
class NewCoachPipeline(QueryPipeline): | |
def query(self, coach_id): | |
# Query for all coach-related data, return as a dict | |
pass | |
class NewStudentPipeline(QueryPipeline): | |
def query(self, student_id, start_dt, end_dt): | |
# Query for all student-related data, return as a dict | |
pass | |
def generate_sri_data(query_config): | |
stage = NewSriCoordinatedQueryPipeline(query_config) | |
stage.run() | |
print stage.outputs.default.value | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment