class Split(apache_beam.DoFn):
def process(self, element):
country, duration, user = element.split(",")
return [{
'country': country,
'duration': float(duration),
'user': user
}]
from abc import ABCMeta, abstractmethod | |
class ConfigurationPort(metaclass=ABCMeta): | |
@abstractmethod | |
def get(self, key): | |
pass |
Verifying my Blockstack ID is secured with the address 1NtWtFZB35YPSTSeCfFQiu6uGB5CWvU9dD https://explorer.blockstack.org/address/1NtWtFZB35YPSTSeCfFQiu6uGB5CWvU9dD |
class Split(apache_beam.DoFn):
def process(self, element):
country, duration, user = element.split(",")
return [{
'country': country,
'duration': float(duration),
'user': user
}]
with apache_beam.Pipeline(options=options) as p: | |
rows = ( | |
p | | |
ReadFromText("input.txt") | | |
apache_beam.ParDo(Split()) | |
) | |
timings = ( | |
rows | |
class Split(apache_beam.DoFn): | |
def process(self, element): | |
country, duration, user = element.split(",") | |
return [{ | |
'country': country, | |
'duration': float(duration), | |
'user': user | |
}] |
with apache_beam.Pipeline(options=options) as p: | |
rows = ( | |
p | | |
ReadFromText(input_filename) | | |
apache_beam.ParDo(Split()) | |
) |
{ | |
"Spain (ES)": [2.2, 2.9], | |
"United kingdom (UK)": [4.2] | |
} |
timings = ( | |
rows | | |
apache_beam.ParDo(CollectTimings()) | | |
"Grouping timings" >> apache_beam.GroupByKey() | | |
"Calculating average" >> apache_beam.CombineValues( | |
apache_beam.combiners.MeanCombineFn() | |
) | |
) | |
users = ( |
class CollectTimings(apache_beam.DoFn): | |
def process(self, element): | |
""" | |
Returns a list of tuples containing country and duration | |
""" | |
result = [ | |
(element['country'], element['duration']) | |
] |
to_be_joined = ( | |
{ | |
'timings': timings, | |
'users': users | |
} | | |
apache_beam.CoGroupByKey() | | |
apache_beam.ParDo(WriteToCSV()) | | |
WriteToText(output_filename) | |
) |