|
import apache_beam as beam |
|
import random |
|
import numpy as np |
|
import typing |
|
|
|
|
|
class GenerateSamples(beam.DoFn): |
|
def __init__(self, maximum_radius, dimensions): |
|
self.radius = maximum_radius |
|
self.dimensions = dimensions |
|
|
|
def process(self, elm): |
|
random.seed(elm) |
|
yield (random.random() * self.radius for _ in range(self.dimensions)) |
|
|
|
|
|
def individual_circle_montecarlo(samples, radius=1, dimensions=2): |
|
with beam.Pipeline() as p: |
|
result = ( |
|
p |
|
| beam.Create(list(range(samples))) |
|
# Generate random points in an n-dimensional space. |
|
| beam.ParDo(GenerateSamples(radius, dimensions)) |
|
# Verify if the points are within the circle. |
|
| beam.Map( |
|
lambda x: 1 if sum(dim * dim for dim in x) <= radius * radius else 0 |
|
) |
|
| beam.CombineGlobally(sum) |
|
) |
|
result | "final_ratio" >> beam.Map( |
|
lambda tot: print( |
|
"pablito", tot, samples, radius * radius * 4 * tot / samples |
|
) |
|
) |
|
|
|
|
|
class BatchedGenerateSamples(beam.DoFn): |
|
"""Generate random points in an n-dimensional space.""" |
|
def __init__(self, dimensions, radius): |
|
self.dimensions = dimensions |
|
self.radius = radius |
|
|
|
def process_batch(self, seeds: np.ndarray) -> typing.Iterator[np.ndarray]: |
|
radiuses = np.random.rand(*seeds.shape, self.dimensions) * self.radius |
|
yield radiuses * radiuses |
|
|
|
def infer_output_type(self, input_element_type): |
|
return np.int64 |
|
|
|
|
|
class BatchedSumAndCheck(beam.DoFn): |
|
"""Verify if the points are within the circle. |
|
|
|
This is a DoFn that consumes batches, but yields individual elements. |
|
""" |
|
def __init__(self, radius): |
|
self.radius = radius |
|
|
|
@beam.DoFn.yields_elements |
|
def process_batch(self, radiuses: np.ndarray) -> typing.Iterator[int]: |
|
sums = radiuses.sum(axis=1) |
|
in_or_out = sum(sums < self.radius * self.radius) |
|
yield in_or_out |
|
|
|
|
|
def batched_circle_montecarlo(samples: int, radius=1, dimensions=2): |
|
with beam.Pipeline() as p: |
|
result = ( |
|
p |
|
| beam.Create(list(range(samples))).with_output_types(np.int64) |
|
| beam.ParDo(BatchedGenerateSamples(2, 1)) |
|
| beam.ParDo(BatchedSumAndCheck(1)) |
|
| beam.CombineGlobally(sum) |
|
) |
|
result | "final_ratio" >> beam.Map( |
|
lambda tot: print( |
|
"pablito", tot, samples, radius * radius * 4 * tot / samples |
|
) |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
import time |
|
|
|
samples = 800000 |
|
s1 = time.time() |
|
individual_circle_montecarlo(samples) |
|
print("took ", time.time() - s1, "seconds individually") |
|
|
|
s2 = time.time() |
|
batched_circle_montecarlo(samples) |
|
print("took ", time.time() - s2, "seconds batchedly") |