class Split(apache_beam.DoFn):
def process(self, element):
country, duration, user = element.split(",")
return [{
'country': country,
'duration': float(duration),
'user': user
}]
| from abc import ABCMeta, abstractmethod | |
| class ConfigurationPort(metaclass=ABCMeta): | |
| @abstractmethod | |
| def get(self, key): | |
| pass |
| Verifying my Blockstack ID is secured with the address 1NtWtFZB35YPSTSeCfFQiu6uGB5CWvU9dD https://explorer.blockstack.org/address/1NtWtFZB35YPSTSeCfFQiu6uGB5CWvU9dD |
class Split(apache_beam.DoFn):
def process(self, element):
country, duration, user = element.split(",")
return [{
'country': country,
'duration': float(duration),
'user': user
}]
| with apache_beam.Pipeline(options=options) as p: | |
| rows = ( | |
| p | | |
| ReadFromText("input.txt") | | |
| apache_beam.ParDo(Split()) | |
| ) | |
| timings = ( | |
| rows | |
| class Split(apache_beam.DoFn): | |
| def process(self, element): | |
| country, duration, user = element.split(",") | |
| return [{ | |
| 'country': country, | |
| 'duration': float(duration), | |
| 'user': user | |
| }] |
| with apache_beam.Pipeline(options=options) as p: | |
| rows = ( | |
| p | | |
| ReadFromText(input_filename) | | |
| apache_beam.ParDo(Split()) | |
| ) |
| { | |
| "Spain (ES)": [2.2, 2.9], | |
| "United kingdom (UK)": [4.2] | |
| } |
| timings = ( | |
| rows | | |
| apache_beam.ParDo(CollectTimings()) | | |
| "Grouping timings" >> apache_beam.GroupByKey() | | |
| "Calculating average" >> apache_beam.CombineValues( | |
| apache_beam.combiners.MeanCombineFn() | |
| ) | |
| ) | |
| users = ( |
| class CollectTimings(apache_beam.DoFn): | |
| def process(self, element): | |
| """ | |
| Returns a list of tuples containing country and duration | |
| """ | |
| result = [ | |
| (element['country'], element['duration']) | |
| ] |
| to_be_joined = ( | |
| { | |
| 'timings': timings, | |
| 'users': users | |
| } | | |
| apache_beam.CoGroupByKey() | | |
| apache_beam.ParDo(WriteToCSV()) | | |
| WriteToText(output_filename) | |
| ) |