Created
September 13, 2011 19:32
-
-
Save Dierk/1214832 to your computer and use it in GitHub Desktop.
Classic concurrent producer-consumer problem with using a kanban system to avoid buffer overflows when consumers are slow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovyx.gpars.dataflow.DataFlowQueue | |
import groovyx.gpars.dataflow.operator.DataFlowPoisson | |
import static groovyx.gpars.dataflow.DataFlow.operator | |
import java.util.concurrent.atomic.AtomicInteger | |
def upstream = new DataFlowQueue() // empty trays travel back upstream to the producer | |
def downstream = new DataFlowQueue() // trays with products travel to the consumer downstream | |
def prodWiring = [inputs: [upstream], outputs: [downstream], maxForks: 3 ] // maxForks is optional | |
def consWiring = [inputs: [downstream], outputs: [upstream], maxForks: 3 ] // maxForks is optional | |
class Tray { int card; def product } | |
int wip = prodWiring.maxForks + consWiring.maxForks // work in progress == max # of products in the system | |
wip.times { upstream << new Tray(card: it) } // put empty trays in the system along with its kanban card | |
def product = new AtomicInteger(0) // a dummy example product; could be anything | |
def soMany = 1000 | |
operator prodWiring, { tray -> | |
def prod = product.andIncrement // producer is used concurrently: be careful with shared state | |
if (prod > soMany) { // we do not want to produce endlessly in this example | |
downstream << DataFlowPoisson.instance // let the consumer finish his work, then stop | |
return | |
} | |
def zero = tray.card ? '' : "\n" // new line for tray number zero | |
print "$zero[$tray.card:$prod] " // visualize production point | |
tray.product = prod // put product in tray | |
downstream << tray // send tray with product inside to consumer | |
} | |
def consumer = operator consWiring, { tray -> | |
print " $tray.card:$tray.product " // visualize product consumption and card association | |
tray.product == null // optionally remove product from tray | |
upstream << tray // send empty tray back upstream | |
} | |
consumer.join() // wait for the overall example to finish |
updated the example to keep the card/product association stable without introducing synchronization. Cards are replaced with trays, which contain the product - inspired by having gone through too many airport security checks.
Description of the KanbanFlow pattern available at http://people.canoo.com/mittie/kanbanflow.html
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
groovy 1.8.2, gpars 0.11