Skip to content

Instantly share code, notes, and snippets.

View imrenagi's full-sized avatar
🏠
Take a look to my go-payment lib!

Imre Nagi imrenagi

🏠
Take a look to my go-payment lib!
View GitHub Profile
package mapreduce
import (
"fmt"
"sync"
)
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
func mapF(filename string, contents string) []mapreduce.KeyValue {
tokens := strings.FieldsFunc(contents, func(r rune) bool {
return !unicode.IsLetter(r)
})
kvs := make([]mapreduce.KeyValue, 0)
for _, val := range tokens {
kvs = append(kvs, mapreduce.KeyValue{Key: val, Value: "1"})
}
return kvs
}
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
var intermediary map[string][]string = make(map[string][]string)
//decode input file for reduce job
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//read file
byteContent, err := ioutil.ReadFile(inFile)
mvn compile exec:java \
-Dexec.mainClass=com.example.myclass \
-Dexec.args="--runner=DataflowRunner \
--project=[YOUR_PROJECT_ID] \
--stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
--templateLocation=gs://[YOUR_BUCKET_NAME]/templates/<template-name>"
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
PCollection<String> items = ...;
PCollection<String> fixedWindowedItems = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.withAllowedLateness(Duration.standardDays(2)));
//merge the two PCollections
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
//flatten
PCollection<String> mergedCollectionWithFlatten = collectionList.apply(Flatten.<String>pCollections());
PCollection<String> names = ...;
PCollection<String> fooCollection = names.apply("FooTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("Foo")){
c.output(c.element());
}
}}));
final TupleTag<String> dataTag = new TupleTag<String>() {};
final TupleTag<String> errorTag = new TupleTag<String>() {};
PCollectionTuple mixedCollection =
names.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("Foo")) {
c.output(c.element());