Skip to content

Instantly share code, notes, and snippets.

@alok87
Last active March 11, 2021 15:59
Show Gist options
  • Save alok87/d9d08a95b3af38eea7e374339f66c199 to your computer and use it in GitHub Desktop.
Save alok87/d9d08a95b3af38eea7e374339f66c199 to your computer and use it in GitHub Desktop.
subscription_manager_with_debug_flags_1.27.2
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
for {
if len(buffer) > 0 {
select {
case event, ok := <-bc.input:
if !ok {
klog.V(2).Info("subscriptionManager() empty receive (bc.input closed)")
goto done
}
klog.V(2).Infof("subscriptionManager() received %s, buffer(%v)=%v", event.topic, len(buffer), buffer)
buffer = append(buffer, event)
klog.V(2).Infof("subscriptionManager() received %s, buffer(%v)=%v (appended)", event.topic, len(buffer), buffer)
case bc.newSubscriptions <- buffer:
klog.V(2).Infof("subscriptionManager() sent buffer(%v)=%v, buffer set nil", len(buffer), buffer)
buffer = nil
case bc.wait <- none{}:
klog.V(2).Info("subscriptionManager() bc.wait()")
}
} else {
select {
case event, ok := <-bc.input:
if !ok {
klog.V(2).Info("subscriptionManager() buffer=0, empty receive (bc.input closed)")
goto done
}
klog.V(2).Infof("subscriptionManager() received %s, buffer(%v)=%v", event.topic, len(buffer), buffer)
buffer = append(buffer, event)
klog.V(2).Infof("subscriptionManager() received %s, buffer(%v)=%v (appended)", event.topic, len(buffer), buffer)
case bc.newSubscriptions <- nil:
klog.V(2).Info("subscriptionManager() bc.newSubscriptions <- nil")
}
}
}
done:
klog.V(2).Info("subscriptionManager() close(bc.wait)")
close(bc.wait)
if len(buffer) > 0 {
klog.V(2).Infof("subscriptionManager() buffer > 0, sent buffer(%v)=%v", len(buffer), buffer)
bc.newSubscriptions <- buffer
}
klog.V(2).Info("subscriptionManager() close(bc.newSubscriptions)")
close(bc.newSubscriptions)
}
//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
klog.V(2).Infof("subscriptionConsumer(%v) STARTING FOR LOOP", bc.broker.ID())
klog.V(2).Infof("subscriptionConsumer(%v) waiting", bc.broker.ID())
<-bc.wait // wait for our first piece of work
klog.V(2).Infof("subscriptionConsumer(%v) resuming...", bc.broker.ID())
for newSubscriptions := range bc.newSubscriptions {
klog.V(2).Infof("subscriptionConsumer(%v) updating newSubscription=%+v", bc.broker.ID(), newSubscriptions)
bc.updateSubscriptions(newSubscriptions)
if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
// Either way, the signal just hasn't propagated to our goroutine yet.
klog.V(2).Infof("subscriptionConsumer(%v) bc.subscriptions==0 waiting", bc.broker.ID())
<-bc.wait
continue
}
klog.V(2).Infof("subscriptionConsumer(%v) fetchNewMessages()", bc.broker.ID())
response, err := bc.fetchNewMessages()
if err != nil {
klog.V(2).Infof("subscriptionConsumer(%v) aborting due to fetchNewMessages err(), err =%+v", bc.broker.ID(), err)
Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
bc.abort(err)
return
}
bc.acks.Add(len(bc.subscriptions))
for child := range bc.subscriptions {
child.feeder <- response
}
bc.acks.Wait()
bc.handleResponses()
}
}
func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
for _, child := range newSubscriptions {
klog.V(2).Infof("updateSubscriptions(%s)", child.topic)
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}
for child := range bc.subscriptions {
select {
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
// no-op
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment