Kafka 笔记—可靠性、幂等性和事务
可靠性
如何保证消息不丢失
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
已提交的消息
当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。
有限度的持久化保证
假如一条消息保存在 N 个 Kafka Broker 上,那么至少这 N 个 Broker 至少有一个存活,才能保证消息不丢失。
丢失数据案例
生产者程序丢失数据
由于 Kafka Producer 是异步发送的,调用完 producer.send(msg) 并不能认为消息已经发送成功。
所以,在 Producer 永远要使用带有回调通知的发送 API,使用 producer.send(msg,callback)。一旦出现消息提交失败的情况,可以由针对性地进行处理。
消费者端丢失数据
消费者是先更新 offset,再消费消息。如果这个时候消费者突然宕机了,那么这条消息就会丢失。
所以我们要先消费消息,再更新 offset 位置。但是这样会导致消息重复消费。
还有一种情况就是 consumer 获取到消息后开启了多个线程异步处理消息,而 consumer 自动地向前更新 offset。假如其中某个线程运行失败了,那么消息就丢失了。
遇到这样的情况,consumer 不要开启自动提交位移,而是要应用程序手动提交位移。
最佳实现
解释第二条和第六条:
如果 ISR 中只有 1 个副本了,acks=all 也就相当于 acks=1 了,引入 min.insync.replicas 的目的就是为了做一个下限的限制:不能只满足于 ISR 全部写入,还要保证 ISR 中的写入个数不少于 min.insync.replicas。
幂等性
在 0.11.0.0 版本引入了创建幂等性 Producer 的功能。仅需要设置 props.put(“enable.idempotence”,true),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。
enable.idempotence 设置成 true 后,Producer 自动升级成幂等性 Producer。Kafka 会自动去重。Broker 会多保存一些字段。当 Producer 发送了相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了。
作用范围:
事务
Kafka 在 0.11 版本开始提供对事务的支持,提供是 read committed 隔离级别的事务。保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
事务性 Producer
保证多条消息原子性地写入到多个分区中。这批消息要么全部成功,要不全部失败。事务性 Producer 也不惧进程重启。
Producer 端的设置:
除此之外,还要加上调用事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,分别应对事务的初始化、事务开始、事务提交以及事务终止。
如下:
这段代码能保证 record1 和 record2 被当做一个事务同一提交到 Kafka,要么全部成功,要么全部写入失败。
Consumer 端的设置:
设置 isolation.level 参数,目前有两个取值: