一个 streaming app,从 kafka 某个 topic 获取数据,用的 high level api,最终应该是只有一个 KafkaReceiver 对吧 然后这个 KafkaReceiver 我看代码应该只会分发到一个 executos 去接收数据
是的
即使topic 有多个 partition
是的 你也可以弄多个Receiver,每个Receiver可以通过设置多线程接收的方式
怎么弄多个?一个 ReceiverInputDStream#getReveier 只能返回一个 receiver。多线程我知道 但是貌似不能搞到多台机器上运行多个 receiver 吧
多个Receiver Union
我猜测是我们平时直接调用的 api 搞了多个 ReceiverInputDStream 然后返回给我们之前 union 了一下
是的, 具体代码如下:
val kafkaDStreams = (1 to kafkaDStreamsNum).map { _ => KafkaUtils.createStream(
ssc,
zookeeper,
groupId,
Map("your topic" -> 1),
if (memoryOnly) StorageLevel.MEMORY_ONLY else StorageLevel.MEMORY_AND_DISK_SER_2)}
val unionDStream = ssc.union(kafkaDStreams)
unionDStream
多谢多谢 终于搞明白了
然而一般一个就足够了 唯一的问题对Receiver所在的Executor内存压力太大 这种分离式的数据接收方式不可取 需要蓄水池 Receiver是注水,后面的消费是放水。很容易不匹配导致蓄水池满了 不知道能理解么
就是每个 receiver 都会有一个类似缓存 buffer 样的东西是吧
对的 接受和消费分开 必然需要蓄水池 蓄水池满了 必然有内存问题 Direct的模式则跳过蓄水池 拉一批消费一批 而对于Receiver模式,是Receiver维护了一个Buffer,Buffer满了放BlockManager(而且这个BlockManager仅仅指的是Receiver所在的节点,并不能放到其他节点上) 这里的BlockManager 是我说的蓄水池 计算周期到了后BlockManager 的数据makeRDD然后进行计算
官网docs提到这个问题了