Skip to content

Instantly share code, notes, and snippets.

@sangheestyle
Last active February 20, 2018 21:39
Show Gist options
  • Save sangheestyle/8c533c69f86f62166491b92df47056c8 to your computer and use it in GitHub Desktop.
Save sangheestyle/8c533c69f86f62166491b92df47056c8 to your computer and use it in GitHub Desktop.
Generic and non-generic version of map reduce (from item 22: use @classmethod polymorphism to construct objects generically in Effective Python)
class InputData(object):
"""Abstraction base class"""
def read(self):
raise NotImplementedError
class PathInputData(InputData):
"""Concrete class"""
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
with open(self.path, 'rb') as handle:
return handle.read()
class Worker(object):
"""Abstraction base class"""
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
class LineCountWorker(Worker):
"""Concrete class"""
def map(self):
data = self.input_data.read()
self.result = data.count(b'\n')
def reduce(self, other):
self.result += other.result
# Orchastration by helper functions
import os
def generate_inputs(data_dir):
for name in os.listdir(data_dir):
yield PathInputData(os.path.join(data_dir, name))
def create_workers(input_list):
workers = []
for input_data in input_list:
workers.append(LineCountWorker(input_data))
return workers
from threading import Thread
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker)
return first.result
def mapreduce(data_dir):
inputs = generate_inputs(data_dir)
workers = create_workers(inputs)
return execute(workers)
import random
def write_test_files(tmpdir):
for i in range(100):
with open(os.path.join(tmpdir, str(i)), 'w') as handle:
handle.write('\n' * random.randint(0, 100))
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
result = mapreduce(tmpdir)
print(result)
class InputData(object):
"""Abstraction base class"""
def read(self):
raise NotImplementedError
@classmethod
def generate_inputs(cls, config):
raise NotImplementedError
import os
class PathInputData(InputData):
"""Concrete class"""
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
with open(self.path, 'rb') as handle:
return handle.read()
@classmethod
def generate_inputs(cls, config):
data_dir = config['data_dir']
for name in os.listdir(data_dir):
yield cls(os.path.join(data_dir, name))
class Worker(object):
"""Abstraction base class"""
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
@classmethod
def create_workers(cls, input_class, config):
workers = []
for input_data in input_class.generate_inputs(config):
workers.append(cls(input_data))
return workers
class LineCountWorker(Worker):
"""Concrete class"""
def map(self):
data = self.input_data.read()
self.result = data.count(b'\n')
def reduce(self, other):
self.result += other.result
from threading import Thread
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker)
return first.result
def mapreduce(worker_class, input_class, config):
workers = worker_class.create_workers(input_class, config)
return execute(workers)
import random
def write_test_files(tmpdir):
for i in range(100):
with open(os.path.join(tmpdir, str(i)), 'w') as handle:
handle.write('\n' * random.randint(0, 100))
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
config = {'data_dir': tmpdir}
result = mapreduce(LineCountWorker, PathInputData, config)
print(result)
import abc
class InputData(abc.ABC):
"""Abstraction base class"""
@abc.abstractmethod
def read(self):
pass
@classmethod
@abc.abstractmethod
def generate_inputs(cls, config):
"""generate inputs"""
import os
class PathInputData(InputData):
"""Concrete class"""
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
with open(self.path, 'rb') as handle:
return handle.read()
@classmethod
def generate_inputs(cls, config):
data_dir = config['data_dir']
for name in os.listdir(data_dir):
yield cls(os.path.join(data_dir, name))
class Worker(abc.ABC):
"""Abstraction base class"""
def __init__(self, input_data):
self.input_data = input_data
self.result = None
@abc.abstractmethod
def map(self):
pass
@abc.abstractmethod
def reduce(self, other):
pass
@classmethod
def create_workers(cls, input_class, config):
workers = []
for input_data in input_class.generate_inputs(config):
workers.append(cls(input_data))
return workers
class LineCountWorker(Worker):
"""Concrete class"""
def map(self):
data = self.input_data.read()
self.result = data.count(b'\n')
def reduce(self, other):
self.result += other.result
from threading import Thread
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker)
return first.result
def mapreduce(worker_class, input_class, config):
workers = worker_class.create_workers(input_class, config)
return execute(workers)
import random
def write_test_files(tmpdir):
for i in range(100):
with open(os.path.join(tmpdir, str(i)), 'w') as handle:
handle.write('\n' * random.randint(0, 100))
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
config = {'data_dir': tmpdir}
result = mapreduce(LineCountWorker, PathInputData, config)
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment