Skip to content

Instantly share code, notes, and snippets.

@alixedi
Last active August 14, 2023 05:31
Show Gist options
  • Save alixedi/0c1c19eb90ccfd1eb2b8 to your computer and use it in GitHub Desktop.
Save alixedi/0c1c19eb90ccfd1eb2b8 to your computer and use it in GitHub Desktop.
ARM Coding Challenge - Production Line - by Ali Zaidi
# -*- coding: utf-8 -*-
"""
conveyor.conveyor
~~~~~~~~~~~~~~~~~
Models a conveyor belt in software.
Even for an interesting problem as this, I only had about an hour.
Therefore, when weighing my options amongst:
1. Discrete event simulation
2. Dangerous concoction of coroutines
3. Master-slave wih cooperative scheduling
I went for 3 because simpy sucks (only library I know that went from OO to
procedural) and I have never used coroutines in a serious project.
On a side note, it would have been much more fun if I had some test data
to go with the problem description. Generally, such models are based on
some real-world scenarios and therfore can be tested statistically against
observations.
Finally, I do not think that sources that emit an item after a random time
period (Poisson arrival) is the same as uniform distribution of A, B and
Empty. Strictly speaking. However, the problem description switches from
one to the other between paragraphs.
P.S. This code have been tested for Python 2.7.5. It does not have any
dependencies apart from standard libraries.
Enjoy.
:copyright: (c) 2015 by Ali Zaidi.
:license: BSD
"""
import random
class Worker(object):
"""A Class that implements a worker at the conveyor belt.
A worker has 4 properties:
name: Solid debugging gold!
parts_reqd: Parts required to start making a product.
time_reqd: Time required to finish a product.
slave: The 'other' worker on the station.
A worker has 3 states:
scanning: If a worker does not have the parts_reqd, he looks
at the parts arriving at the conveyor belt. If he finds something he
needs, he picks it up and places it in the cache. Otherwise, he passes
control to the junior developer.. er slave worker.
working: Once a worker gets all the parts he needs, he starts working.
The worker finishes a product in time_reqd. While he is working, he
gracoiusly hands over the reigns to the slave worker.
submitting: After finishing a product, a worker looks for an empty
slot in the conveyor belt to put his product. As always, if the slot is
not forthcoming, control is passed to the slave worker.
"""
def __init__(self, name, parts_reqd, time_reqd, slave=None):
self.name = name
self.parts_reqd = parts_reqd
self.time_reqd = time_reqd
self.slave = slave
self.cache = []
self.clk = time_reqd
def run(self, stage):
"""The container for the worker (Conveyor class in this case) will
call this function for each tick of the simulated clock."""
# --- working ---
# If all items in parts_reqd are in cache!
# See: http://stackoverflow.com/a/3932429/1182202
if all([r in self.cache for r in self.parts_reqd]):
# --- submitting ---
# Finished working?
if self.clk <= 0:
print '<%s> P ready.' % self.name
if stage.val == '':
self.cache = []
self.clk = self.time_reqd
print '<%s> Putting P.' % self.name
stage.val = 'P (%s)' % self.name
else:
if self.slave: self.slave.run(stage)
else:
self.clk -= 1
if self.slave: self.slave.run(stage)
# --- scanning ---
# Missing some parts_reqd?
else:
# Found something we do not have?
if stage.val not in self.cache and stage.val in self.parts_reqd:
# Put it in the cache!
print '<%s> Picking up %s.' % (self.name, stage.val)
self.cache.append(stage.val)
stage.val = ''
else:
if self.slave: self.slave.run(stage)
class Source(object):
"""A class that implements a source. For instance, in the given problem,
a source would emit one of 'A', 'B' or '' with equal probability.
A source has a single property:
items: The list of possible items that the source could emit.
"""
def __init__(self, items):
self.items = items
def new(self):
"""Returns a random item amongst the given items using uniform
distribution. Over-ride this method to produce more interesting
distributions."""
return random.choice(self.items)
class Stage(object):
"""Python does not do pass by reference. This class wraps the value
held at the respective conveyor belt stage. In return to this hack,
we get a significantly more succint run function for the Worker class.
"""
def __init__(self, val):
self.val = val
class Analyzer(object):
"""Implements aggregations on the conveyor belt output. Override for
more fancy stuff."""
def __init__(self):
self.data = {'P': 0, 'A': 0, 'B': 0}
def run(self, out):
"""Analyze the output."""
if len(out.val) > 0:
key = out.val[0]
self.data[key] += 1
def report(self):
"""Reports on the aggregates."""
print 'Total Products: %d' % self.data['P']
print 'Total Unpicked: %d' % (self.data['A'] + self.data['B'])
class Conveyor(object):
"""A Class that implements a conveyor belt.
A conveyor belt has:
stages: Wrapper for a value. Value can be one of 'A', 'B', '' or 'P'.
source: Uniform RNG producing 'A', 'B' and '' with equal probability.
workers: Set of workers that consume raw materials 'A' and 'B' and
produce product 'P'. Configuration determined by a factory function.
analyzer: Simplistic and rather daft. Match of the day type.
The real action happens inside run - particularly the line that moves the
pipeline is "da bomb".
"""
def __init__(self, source, analyzer, stages=3):
self.stages = [Stage('')] * stages
self.source = source
self.workers = Conveyor.worker_factory(self.stages)
self.analyzer = analyzer
@staticmethod
def worker_factory(stages):
"""This is the default factory for making workers. It returns two
workers for each stage in the conveyor belt. The workers have the
same characteristics. Override for something more clever."""
workers = {}
for i, _ in enumerate(stages):
# Two workers either side of the conveyor belt
workers[i] = Worker('M%d' % i,
['A', 'B'],
3,
Worker('S%d' % i,
['A', 'B'],
3))
return workers
def printer(self, t):
"""Prints something useful. Default is to print whatever is at the end
of the conveyor belt. Override for something more clever."""
print '%d: %s' % (t, ' | '.join([s.val for s in self.stages]))
def run(self, until=100):
"""Start the factory!"""
for t in range(until):
print '\n' + '------' * len(self.stages)
# move the belt
self.stages = [Stage(self.source.new())] + self.stages[:-1]
# print the belt
self.printer(t)
# for each stage, invoke run for the workers
for i, s in enumerate(self.stages):
self.workers[i].run(s)
# print the belt
self.printer(t)
# Send the output to analyzer
self.analyzer.run(self.stages[-1])
print '\n' + '------' * len(self.stages)
self.analyzer.report()
def main():
Conveyor(Source(['A', 'B', '']),
Analyzer(),
stages=3).run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment