Kafka 笔记—可靠性、幂等性和事务

可靠性

如何保证消息不丢失

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

已提交的消息

当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。

有限度的持久化保证

假如一条消息保存在 N 个 Kafka Broker 上,那么至少这 N 个 Broker 至少有一个存活,才能保证消息不丢失。

丢失数据案例

生产者程序丢失数据

由于 Kafka Producer 是异步发送的,调用完 producer.send(msg) 并不能认为消息已经发送成功。

所以,在 Producer 永远要使用带有回调通知的发送 API,使用 producer.send(msg,callback)。一旦出现消息提交失败的情况,可以由针对性地进行处理。

消费者端丢失数据

消费者是先更新 offset,再消费消息。如果这个时候消费者突然宕机了,那么这条消息就会丢失。

所以我们要先消费消息,再更新 offset 位置。但是这样会导致消息重复消费。

还有一种情况就是 consumer 获取到消息后开启了多个线程异步处理消息,而 consumer 自动地向前更新 offset。假如其中某个线程运行失败了,那么消息就丢失了。

遇到这样的情况,consumer 不要开启自动提交位移,而是要应用程序手动提交位移。

最佳实现

解释第二条和第六条:

如果 ISR 中只有 1 个副本了,acks=all 也就相当于 acks=1 了,引入 min.insync.replicas 的目的就是为了做一个下限的限制:不能只满足于 ISR 全部写入,还要保证 ISR 中的写入个数不少于 min.insync.replicas。

幂等性

在 0.11.0.0 版本引入了创建幂等性 Producer 的功能。仅需要设置 props.put(“enable.idempotence”,true),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。

enable.idempotence 设置成 true 后,Producer 自动升级成幂等性 Producer。Kafka 会自动去重。Broker 会多保存一些字段。当 Producer 发送了相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了。

作用范围:

事务

Kafka 在 0.11 版本开始提供对事务的支持,提供是 read committed 隔离级别的事务。保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。

事务性 Producer

保证多条消息原子性地写入到多个分区中。这批消息要么全部成功,要不全部失败。事务性 Producer 也不惧进程重启。

Producer 端的设置:

除此之外,还要加上调用事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,分别应对事务的初始化、事务开始、事务提交以及事务终止。

如下:

这段代码能保证 record1 和 record2 被当做一个事务同一提交到 Kafka,要么全部成功,要么全部写入失败。

Consumer 端的设置:

设置 isolation.level 参数,目前有两个取值: