kafka不丢数据方案、数据重复处理、数据乱序问题处理
一Kafka不丢数据方案
kafka处理数据不丢失,主要分为producer角度、broker角度、consumer角度
**1、【producer角度】**设置合适的ACK
Ack = 0
相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack = 1
leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack = -1
leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
ack在生产者指定,不同生产者可以不同。
ack设为-1,需要ISR里的所有follower应答,想要真正不丢数据,需要配合参数:
min.insync.replicas: ack为-1时生效,ISR里应答的最小follower数量。
默认为1(leader本身也算一个!),所以当ISR里除了leader本身,没有其他的follower,即使ack设为-1,相当于1的效果,不能保证不丢数据。
需要将min.insync.replicas设置大于等于2,才能保证有其他副本同步到数据。
retries = Integer.MAX_VALUE,无限重试。如果上述两个条件不满足,写入一直失败,就会无限次重试,保证数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic testisr2 --replication-factor 3 --partitions 4 --config min.insync.replicas=2
【producer端总结】
完全不丢结论:ack=-1 + min.insync.replicas>=2 +无限重试
2、【broker角度】
副本数大于1
min.insync.replicas大于1
3、【consumer角度】
手动提交offset,flink结合checkpoint
二、Kafka数据重复
产生的原因:发生重试造成的重复。
幂等性 + ack-1 + 事务
Kafka数据重复,可以在下一级:SparkStreaming、redis、Flink或者Hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
Kafka幂等性原理(单分区单会话):producer重试引起的乱序和重复
1、重复问题的解决:
1)Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic的partition也会维护pid-seq的映射,并且每Commit都会更新lastSeq。
2)recordBatch到来时,broker会先检查RecordBatch再保存数据:
如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存。
三、消息乱序问题解决方案
假设我们有5个请求,batch1、batch2、batch3、batch4、batch5;
如果只有batch2 ack failed,3、4、5都保存了,那2将会随下次batch重发而造成重复。
可以设置max.in.flight.requests.per.connection=1(客户端在单个连接上能够发送的未响应请求的个数)来解决乱序,但降低了系统吞吐。
新版本kafka设置enable.idempotence=true后能够动态调整max-in-flight-request。正常情况下max.in.flight.requests.per.connection大于1。当重试请求到来时,batch 会根据 seq重新添加到队列的合适位置,并把max.in.flight.requests.per.connection设为1,这样它前面的 batch序号都比它小,只有前面的都发完了,它才能发。
kafka不丢数据方案、数据重复处理、数据乱序问题处理
一Kafka不丢数据方案
kafka处理数据不丢失,主要分为producer角度、broker角度、consumer角度
**1、【producer角度】**设置合适的ACK
Ack = 0
相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack = 1
leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack = -1
leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
ack在生产者指定,不同生产者可以不同。
ack设为-1,需要ISR里的所有follower应答,想要真正不丢数据,需要配合参数:
min.insync.replicas: ack为-1时生效,ISR里应答的最小follower数量。
默认为1(leader本身也算一个!),所以当ISR里除了leader本身,没有其他的follower,即使ack设为-1,相当于1的效果,不能保证不丢数据。
需要将min.insync.replicas设置大于等于2,才能保证有其他副本同步到数据。
retries = Integer.MAX_VALUE,无限重试。如果上述两个条件不满足,写入一直失败,就会无限次重试,保证数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic testisr2 --replication-factor 3 --partitions 4 --config min.insync.replicas=2
【producer端总结】
完全不丢结论:ack=-1 + min.insync.replicas>=2 +无限重试
2、【broker角度】
副本数大于1
min.insync.replicas大于1
3、【consumer角度】
手动提交offset,flink结合checkpoint
二、Kafka数据重复
产生的原因:发生重试造成的重复。
幂等性 + ack-1 + 事务
Kafka数据重复,可以在下一级:SparkStreaming、redis、Flink或者Hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
Kafka幂等性原理(单分区单会话):producer重试引起的乱序和重复
1、重复问题的解决:
1)Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic的partition也会维护pid-seq的映射,并且每Commit都会更新lastSeq。
2)recordBatch到来时,broker会先检查RecordBatch再保存数据:
如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存。
三、消息乱序问题解决方案
假设我们有5个请求,batch1、batch2、batch3、batch4、batch5;
如果只有batch2 ack failed,3、4、5都保存了,那2将会随下次batch重发而造成重复。
可以设置max.in.flight.requests.per.connection=1(客户端在单个连接上能够发送的未响应请求的个数)来解决乱序,但降低了系统吞吐。
新版本kafka设置enable.idempotence=true后能够动态调整max-in-flight-request。正常情况下max.in.flight.requests.per.connection大于1。当重试请求到来时,batch 会根据 seq重新添加到队列的合适位置,并把max.in.flight.requests.per.connection设为1,这样它前面的 batch序号都比它小,只有前面的都发完了,它才能发。
发布评论