Last active
February 20, 2018 21:39
-
-
Save sangheestyle/8c533c69f86f62166491b92df47056c8 to your computer and use it in GitHub Desktop.
Generic and non-generic version of map reduce (from item 22: use @classmethod polymorphism to construct objects generically in Effective Python)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class InputData(object): | |
"""Abstraction base class""" | |
def read(self): | |
raise NotImplementedError | |
class PathInputData(InputData): | |
"""Concrete class""" | |
def __init__(self, path): | |
super().__init__() | |
self.path = path | |
def read(self): | |
with open(self.path, 'rb') as handle: | |
return handle.read() | |
class Worker(object): | |
"""Abstraction base class""" | |
def __init__(self, input_data): | |
self.input_data = input_data | |
self.result = None | |
def map(self): | |
raise NotImplementedError | |
def reduce(self, other): | |
raise NotImplementedError | |
class LineCountWorker(Worker): | |
"""Concrete class""" | |
def map(self): | |
data = self.input_data.read() | |
self.result = data.count(b'\n') | |
def reduce(self, other): | |
self.result += other.result | |
# Orchastration by helper functions | |
import os | |
def generate_inputs(data_dir): | |
for name in os.listdir(data_dir): | |
yield PathInputData(os.path.join(data_dir, name)) | |
def create_workers(input_list): | |
workers = [] | |
for input_data in input_list: | |
workers.append(LineCountWorker(input_data)) | |
return workers | |
from threading import Thread | |
def execute(workers): | |
threads = [Thread(target=w.map) for w in workers] | |
for thread in threads: thread.start() | |
for thread in threads: thread.join() | |
first, rest = workers[0], workers[1:] | |
for worker in rest: | |
first.reduce(worker) | |
return first.result | |
def mapreduce(data_dir): | |
inputs = generate_inputs(data_dir) | |
workers = create_workers(inputs) | |
return execute(workers) | |
import random | |
def write_test_files(tmpdir): | |
for i in range(100): | |
with open(os.path.join(tmpdir, str(i)), 'w') as handle: | |
handle.write('\n' * random.randint(0, 100)) | |
from tempfile import TemporaryDirectory | |
with TemporaryDirectory() as tmpdir: | |
write_test_files(tmpdir) | |
result = mapreduce(tmpdir) | |
print(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class InputData(object): | |
"""Abstraction base class""" | |
def read(self): | |
raise NotImplementedError | |
@classmethod | |
def generate_inputs(cls, config): | |
raise NotImplementedError | |
import os | |
class PathInputData(InputData): | |
"""Concrete class""" | |
def __init__(self, path): | |
super().__init__() | |
self.path = path | |
def read(self): | |
with open(self.path, 'rb') as handle: | |
return handle.read() | |
@classmethod | |
def generate_inputs(cls, config): | |
data_dir = config['data_dir'] | |
for name in os.listdir(data_dir): | |
yield cls(os.path.join(data_dir, name)) | |
class Worker(object): | |
"""Abstraction base class""" | |
def __init__(self, input_data): | |
self.input_data = input_data | |
self.result = None | |
def map(self): | |
raise NotImplementedError | |
def reduce(self, other): | |
raise NotImplementedError | |
@classmethod | |
def create_workers(cls, input_class, config): | |
workers = [] | |
for input_data in input_class.generate_inputs(config): | |
workers.append(cls(input_data)) | |
return workers | |
class LineCountWorker(Worker): | |
"""Concrete class""" | |
def map(self): | |
data = self.input_data.read() | |
self.result = data.count(b'\n') | |
def reduce(self, other): | |
self.result += other.result | |
from threading import Thread | |
def execute(workers): | |
threads = [Thread(target=w.map) for w in workers] | |
for thread in threads: thread.start() | |
for thread in threads: thread.join() | |
first, rest = workers[0], workers[1:] | |
for worker in rest: | |
first.reduce(worker) | |
return first.result | |
def mapreduce(worker_class, input_class, config): | |
workers = worker_class.create_workers(input_class, config) | |
return execute(workers) | |
import random | |
def write_test_files(tmpdir): | |
for i in range(100): | |
with open(os.path.join(tmpdir, str(i)), 'w') as handle: | |
handle.write('\n' * random.randint(0, 100)) | |
from tempfile import TemporaryDirectory | |
with TemporaryDirectory() as tmpdir: | |
write_test_files(tmpdir) | |
config = {'data_dir': tmpdir} | |
result = mapreduce(LineCountWorker, PathInputData, config) | |
print(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
class InputData(abc.ABC): | |
"""Abstraction base class""" | |
@abc.abstractmethod | |
def read(self): | |
pass | |
@classmethod | |
@abc.abstractmethod | |
def generate_inputs(cls, config): | |
"""generate inputs""" | |
import os | |
class PathInputData(InputData): | |
"""Concrete class""" | |
def __init__(self, path): | |
super().__init__() | |
self.path = path | |
def read(self): | |
with open(self.path, 'rb') as handle: | |
return handle.read() | |
@classmethod | |
def generate_inputs(cls, config): | |
data_dir = config['data_dir'] | |
for name in os.listdir(data_dir): | |
yield cls(os.path.join(data_dir, name)) | |
class Worker(abc.ABC): | |
"""Abstraction base class""" | |
def __init__(self, input_data): | |
self.input_data = input_data | |
self.result = None | |
@abc.abstractmethod | |
def map(self): | |
pass | |
@abc.abstractmethod | |
def reduce(self, other): | |
pass | |
@classmethod | |
def create_workers(cls, input_class, config): | |
workers = [] | |
for input_data in input_class.generate_inputs(config): | |
workers.append(cls(input_data)) | |
return workers | |
class LineCountWorker(Worker): | |
"""Concrete class""" | |
def map(self): | |
data = self.input_data.read() | |
self.result = data.count(b'\n') | |
def reduce(self, other): | |
self.result += other.result | |
from threading import Thread | |
def execute(workers): | |
threads = [Thread(target=w.map) for w in workers] | |
for thread in threads: thread.start() | |
for thread in threads: thread.join() | |
first, rest = workers[0], workers[1:] | |
for worker in rest: | |
first.reduce(worker) | |
return first.result | |
def mapreduce(worker_class, input_class, config): | |
workers = worker_class.create_workers(input_class, config) | |
return execute(workers) | |
import random | |
def write_test_files(tmpdir): | |
for i in range(100): | |
with open(os.path.join(tmpdir, str(i)), 'w') as handle: | |
handle.write('\n' * random.randint(0, 100)) | |
from tempfile import TemporaryDirectory | |
with TemporaryDirectory() as tmpdir: | |
write_test_files(tmpdir) | |
config = {'data_dir': tmpdir} | |
result = mapreduce(LineCountWorker, PathInputData, config) | |
print(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment