Skip to content

Instantly share code, notes, and snippets.

View brunoripa's full-sized avatar

Bruno Ripa brunoripa

View GitHub Profile
from abc import ABCMeta, abstractmethod
class ConfigurationPort(metaclass=ABCMeta):
@abstractmethod
def get(self, key):
pass
Verifying my Blockstack ID is secured with the address 1NtWtFZB35YPSTSeCfFQiu6uGB5CWvU9dD https://explorer.blockstack.org/address/1NtWtFZB35YPSTSeCfFQiu6uGB5CWvU9dD

class Split(apache_beam.DoFn):

def process(self, element):
    country, duration, user = element.split(",")

    return [{
        'country': country,
        'duration': float(duration),
        'user': user

}]

with apache_beam.Pipeline(options=options) as p:
rows = (
p |
ReadFromText("input.txt") |
apache_beam.ParDo(Split())
)
timings = (
rows |
class Split(apache_beam.DoFn):
def process(self, element):
country, duration, user = element.split(",")
return [{
'country': country,
'duration': float(duration),
'user': user
}]
with apache_beam.Pipeline(options=options) as p:
rows = (
p |
ReadFromText(input_filename) |
apache_beam.ParDo(Split())
)
{
"Spain (ES)": [2.2, 2.9],
"United kingdom (UK)": [4.2]
}
timings = (
rows |
apache_beam.ParDo(CollectTimings()) |
"Grouping timings" >> apache_beam.GroupByKey() |
"Calculating average" >> apache_beam.CombineValues(
apache_beam.combiners.MeanCombineFn()
)
)
users = (
class CollectTimings(apache_beam.DoFn):
def process(self, element):
"""
Returns a list of tuples containing country and duration
"""
result = [
(element['country'], element['duration'])
]
to_be_joined = (
{
'timings': timings,
'users': users
} |
apache_beam.CoGroupByKey() |
apache_beam.ParDo(WriteToCSV()) |
WriteToText(output_filename)
)