-
-
Save macg33zr/2133649 to your computer and use it in GitHub Desktop.
Classic concurrent producer-consumer problem with using a kanban system to avoid buffer overflows when consumers are slow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovyx.gpars.dataflow.DataflowQueue | |
import groovyx.gpars.dataflow.operator.PoisonPill | |
import static groovyx.gpars.dataflow.Dataflow.operator | |
import java.util.concurrent.atomic.AtomicInteger | |
def upstream = new DataflowQueue() // empty trays travel back upstream to the producer | |
def downstream = new DataflowQueue() // trays with products travel to the consumer downstream | |
def prodWiring = [inputs: [upstream], outputs: [downstream], maxForks: 3 ] // maxForks is optional | |
def consWiring = [inputs: [downstream], outputs: [upstream], maxForks: 3 ] // maxForks is optional | |
class Tray { int card; def product } | |
int wip = prodWiring.maxForks + consWiring.maxForks // work in progress == max # of products in the system | |
wip.times { upstream << new Tray(card: it) } // put empty trays in the system along with its kanban card | |
def product = new AtomicInteger(0) // a dummy example product; could be anything | |
def soMany = 20 | |
operator prodWiring, { tray -> | |
def prod = product.andIncrement // producer is used concurrently: be careful with shared state | |
if (prod > soMany) { // we do not want to produce endlessly in this example | |
downstream << PoisonPill.instance // let the consumer finish his work, then stop | |
return | |
} | |
def zero = tray.card ? '' : "\n" // new line for tray number zero | |
print "$zero[$tray.card:$prod] " // visualize production point | |
tray.product = prod // put product in tray | |
downstream << tray // send tray with product inside to consumer | |
} | |
def consumer = operator consWiring, { tray -> | |
// If I make the consumer slow, all product made does not get consumed | |
sleep 2000 | |
print " $tray.card:$tray.product " // visualize product consumption and card association | |
tray.product == null // optionally remove product from tray | |
upstream << tray // send empty tray back upstream | |
} | |
consumer.join() // wait for the overall example to finish |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I wanted to get the demo to work with Groovy 1.8.5 (GPars version 0.12). If the consumer is slow, all the products do not get consumed (I reduced it to 20, but only see 17 coming out). I need to read some more GPars doc to figure this out...
My output looks like:
[2:1]
[0:0] [1:2] [3:3] [4:4] [5:5] 2:1 0:0 1:2 [2:6]
[0:7] [1:8] 3:3 4:4 5:5 [4:9] [5:10] [3:11] 2:6 [2:12] 1:8 [1:13] 0:7
[0:14] 4:9 [4:15] 5:10 [5:16] 3:11 [3:17] 2:12 [2:18] 1:13 [1:19] 0:14
[0:20] 4:15 5:16 3:17