原文地址:https://www.douyacun.com/article/775c0950f1b6fb4cd702328dda419694
概述:
背景:和producer同样的用途,统计博客网页浏览数据
kafka consumer 从 broker 中 pull 数据,producer push 数据到 broker 中。
优点:consumer可以承受能力的范围内控制消费速度,而且可以批量获取消息块
缺点:如果broker中没有数据,consumer会有轮询broker接口,这里kafka提供了参数来阻塞fetch请求直到有数据可以消费
消费者组的特性:
消费者位移:
kafka面临的问题,如何让broker 和 consumer 就被消费的数据保持一致性
kafka的是如何实现消息确认机制的:
每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的 consumer 组中的一个 consumer 消费,只需要保存这个consumer下一条要消费的消息的位移即可
conf.Consumer.Offsets.Initial
来再次消费正常情况下producer push一条消息到broker,然后consumer从broker pull消息然后消费,这只是理想状态的情况下。我们需要考虑的是发生异常的情况kafka如何做到保证消息是不丢失,不重复消费的。
如果一个 producer 在试图发送消息的时候发生了网络故障, 则不确定网络错误发生在消息提交之前还是之后,broker到底有没有收到这条消息并持久化?
答:如果我们想要确保消息可以重传,但绝不丢失。kafka 0.11.0.0 之前的版本就是重发机制,之后的版本增加了幂等性,会为每条消息分配序列号来避免在broker log中产生重复消息。同时kafka提供了事务机制,多条消息写入log要么都成功要么都失败。
kafka所有的副本都有相同的 log 和相同的 offset。consumer 负责控制它在 log 中的位置,理想状态下consumer永远不会崩溃,没问题。如果consumer崩溃了,这个topic partition必须要有另一个进程来接管,此时该从哪个offset开始接管?
答:2种解决方案,步骤就是先读取,在消费,最后更新位移,这样可以保证消息不丢失,可能会造成消息重复消费
重点:consumer消费数据时,要做一次幂等性消费,避免重复消费数据
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者 接收主题一部分分区的消息。
再均衡:分区的所有权从一个消费者转移到另一个消费者
消费者通过向consumer-group 协调器的broker发送心跳来维持他们和群组的从属关系以及它们对分区的所有权关系,只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明还在正常读取分区的消息,
如果消费者发生崩溃,consumer-group协调器broker,在确认了该consumer死亡后会出发再均衡。
max.poll.interval.ms
控制发送心跳的间隔时间
session.timeout.ms
控制失联多久判定为死亡
封装一下go consumer支持
type ConsumerGroup interface {
Consume(topic []string, handler sarama.ConsumerGroupHandler)
}
type consumerGroup struct {
ctx context.Context
client sarama.ConsumerGroup
groupId string
}
func NewConsumerGroup(ctx context.Context, groupId string) ConsumerGroup {
conf := sarama.NewConfig()
conf.Version = sarama.V0_11_0_2
conf.Consumer.Return.Errors = true
client, err := sarama.NewConsumerGroup(config.Kafka.Broker(), groupId, conf)
if err != nil {
panic(errors.Wrap(err, "kafka new consumer client err"))
}
// Track errors
go func() {
for err := range client.Errors() {
logger.Wrapf(err, "kafka %s consume err", groupId)
}
}()
go func() {
for {
select {
// 主进程退出,通知consumer关闭
case <-ctx.Done():
_ = client.Close()
//logger.Infof("quit: kafka consumer %s", groupId)
return
}
}
}()
return &consumerGroup{
ctx: ctx,
client: client,
groupId: groupId,
}
}
func (c *consumerGroup) Consume(topic []string, handler sarama.ConsumerGroupHandler) {
go func() {
defer c.client.Close()
for {
err := c.client.Consume(c.ctx, topic, handler)
if err != nil {
switch err {
case sarama.ErrClosedClient, sarama.ErrClosedConsumerGroup:
// 退出
logger.Infof("quit: kafka consumer %s", c.groupId)
return
case sarama.ErrOutOfBrokers:
logger.Errorf("kafka 崩溃了~")
default:
logger.Errorf("kafka exception: %s", err.Error())
}
time.Sleep(1 * time.Second)
}
}
}()
}