Skip to content

Instantly share code, notes, and snippets.

@macg33zr
Forked from Dierk/KanbanDemo.groovy
Created March 20, 2012 09:57
Show Gist options
  • Save macg33zr/2133649 to your computer and use it in GitHub Desktop.
Save macg33zr/2133649 to your computer and use it in GitHub Desktop.
Classic concurrent producer-consumer problem with using a kanban system to avoid buffer overflows when consumers are slow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.operator.PoisonPill
import static groovyx.gpars.dataflow.Dataflow.operator
import java.util.concurrent.atomic.AtomicInteger
def upstream = new DataflowQueue() // empty trays travel back upstream to the producer
def downstream = new DataflowQueue() // trays with products travel to the consumer downstream
def prodWiring = [inputs: [upstream], outputs: [downstream], maxForks: 3 ] // maxForks is optional
def consWiring = [inputs: [downstream], outputs: [upstream], maxForks: 3 ] // maxForks is optional
class Tray { int card; def product }
int wip = prodWiring.maxForks + consWiring.maxForks // work in progress == max # of products in the system
wip.times { upstream << new Tray(card: it) } // put empty trays in the system along with its kanban card
def product = new AtomicInteger(0) // a dummy example product; could be anything
def soMany = 20
operator prodWiring, { tray ->
def prod = product.andIncrement // producer is used concurrently: be careful with shared state
if (prod > soMany) { // we do not want to produce endlessly in this example
downstream << PoisonPill.instance // let the consumer finish his work, then stop
return
}
def zero = tray.card ? '' : "\n" // new line for tray number zero
print "$zero[$tray.card:$prod] " // visualize production point
tray.product = prod // put product in tray
downstream << tray // send tray with product inside to consumer
}
def consumer = operator consWiring, { tray ->
// If I make the consumer slow, all product made does not get consumed
sleep 2000
print " $tray.card:$tray.product " // visualize product consumption and card association
tray.product == null // optionally remove product from tray
upstream << tray // send empty tray back upstream
}
consumer.join() // wait for the overall example to finish
@macg33zr
Copy link
Author

I wanted to get the demo to work with Groovy 1.8.5 (GPars version 0.12). If the consumer is slow, all the products do not get consumed (I reduced it to 20, but only see 17 coming out). I need to read some more GPars doc to figure this out...

My output looks like:
[2:1]
[0:0] [1:2] [3:3] [4:4] [5:5] 2:1 0:0 1:2 [2:6]
[0:7] [1:8] 3:3 4:4 5:5 [4:9] [5:10] [3:11] 2:6 [2:12] 1:8 [1:13] 0:7
[0:14] 4:9 [4:15] 5:10 [5:16] 3:11 [3:17] 2:12 [2:18] 1:13 [1:19] 0:14
[0:20] 4:15 5:16 3:17

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment