This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mapreduce | |
import ( | |
"fmt" | |
"sync" | |
) | |
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { | |
var ntasks int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func mapF(filename string, contents string) []mapreduce.KeyValue { | |
tokens := strings.FieldsFunc(contents, func(r rune) bool { | |
return !unicode.IsLetter(r) | |
}) | |
kvs := make([]mapreduce.KeyValue, 0) | |
for _, val := range tokens { | |
kvs = append(kvs, mapreduce.KeyValue{Key: val, Value: "1"}) | |
} | |
return kvs | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func doReduce( | |
jobName string, // the name of the whole MapReduce job | |
reduceTask int, // which reduce task this is | |
outFile string, // write the output here | |
nMap int, // the number of map tasks that were run ("M" in the paper) | |
reduceF func(key string, values []string) string, | |
) { | |
var intermediary map[string][]string = make(map[string][]string) | |
//decode input file for reduce job |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func doMap( | |
jobName string, // the name of the MapReduce job | |
mapTask int, // which map task this is | |
inFile string, | |
nReduce int, // the number of reduce task that will be run ("R" in the paper) | |
mapF func(filename string, contents string) []KeyValue, | |
) { | |
//read file | |
byteContent, err := ioutil.ReadFile(inFile) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mvn compile exec:java \ | |
-Dexec.mainClass=com.example.myclass \ | |
-Dexec.args="--runner=DataflowRunner \ | |
--project=[YOUR_PROJECT_ID] \ | |
--stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \ | |
--templateLocation=gs://[YOUR_BUCKET_NAME]/templates/<template-name>" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<String> items = ...; | |
PCollection<String> fixedWindowedItems = items.apply( | |
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) | |
.withAllowedLateness(Duration.standardDays(2))); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//merge the two PCollections | |
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection); | |
//flatten | |
PCollection<String> mergedCollectionWithFlatten = collectionList.apply(Flatten.<String>pCollections()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<String> names = ...; | |
PCollection<String> fooCollection = names.apply("FooTrans", ParDo.of(new DoFn<String, String>(){ | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
if(c.element().startsWith("Foo")){ | |
c.output(c.element()); | |
} | |
}})); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final TupleTag<String> dataTag = new TupleTag<String>() {}; | |
final TupleTag<String> errorTag = new TupleTag<String>() {}; | |
PCollectionTuple mixedCollection = | |
names.apply(ParDo | |
.of(new DoFn<String, String>() { | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
if (c.element().startsWith("Foo")) { | |
c.output(c.element()); |
NewerOlder