由 Flink 与 Kafka 实践探究 Kafka 的两个问题

笔者在某次实践过程中,搭建了一个 Flink 监控程序,监控wikipedia编辑,对编辑者编辑的字节数进行实时计算,最终把数据 sink 到 kafka 的消费者中展示出来,监控程序本身比较简单,只要在程序中指定好WikipediaEditsSource源并配置好 sink 与 kafka 关联就可以,类似一个略微复杂版的wordcount,按照网络上的教程,在实践的最后,开启 zookeeper 服务和 kafka 服务,接着用

|

1

2

|

kafka-console-producer –topic wiki-result  –broker-list localhost:9092

|

这条命令创建一个名为wiki-resulttopic,然后运行监控程序,最后用

|

1

2

|

kafka-console-consumer –bootstrap-server –zookeeper localhost: 9092–topic wiki-result

|

启动消费者,就可以在终端窗口里观察到源源不断的wikipedia数据

当笔者第二天再次跑这个监控程序时,发现上次执行的命令

|

1

2

|

kafka-console-producer –topic wiki-result  –broker-list localhost:9092

|

是生产者命令,然而此例中的生产者实际上是 Fink 监控程序,那么原作者为何使用kafka-console-producer命令去创建 topic 而不是用kafka-topics命令呢?

|

1

2

|

kafka-console-producer –topic wiki-result  –broker-list localhost:9092

|

命令是生产者指定 topic,是否自动创建了 topic 呢?

笔者尝试把现有的topic:wiki-result删掉,然后重新创建 topic,提示如下,并没有真正删除,为此笔者去查了下相关资料,将 topic 创建与删除的原理彻底弄懂了。

在 Kafka 中,Topic 是一个存储消息的逻辑概念,不同的 topic 在物理上来说是分开存储的,可以有多个 producer 向他 push 消息,也可以有多个 consumer 去 pull 消息,每个 Topic 可以划分多个分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。每个消息在被添加到分区时,都会被分配一个连续的序列号 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。

通过命令

|

1

2

|

kafka-topics –create –zookeeper localhost:2181 –replication-factor 2 –partitions 1 –topic test-topic

|

创建了 1 个名为test-topic的 topic,拥有 1 个分区,每个分区分配 2 个副本。创建逻辑如图,总的来说就是后台逻辑会监听 zookeeper 下对应的目录节点,一旦发起 topic 创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。

命令行部分比较直白,无非就是一些基本校验,分配副本 (尽可能保证分区的副本平均分配到每个 broker 上),把分配方案持久化到zookeeper的/brokers/topics/节点下。

后台逻辑部分主要由controller负责,controller内部保存了很多信息,其中有一个分区状态机,用于记录 topic 各个分区的状态。这个状态机内部注册了一些 zookeeper 监听器。Controller 在启动的时候会创建这些监听器。其中一个监听器 (TopicChangeListener) 就是用于监听 zookeeper 的/brokers/topics目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。TopicChangeListener监听器一方面会更新 controller 的缓存信息 (比如更新集群当前所有的 topic 列表以及更新新增 topic 的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定 leader 副本及 ISR。至此,整个 topic 的创建就完成了!

除了使用kafka-topics –create创建 topic 外,还可以使用kafka-console-producer发布消息时创建,kafka 第一步先获取 topic 的 leader 信息,当发现不可用的时候,在去创建此 topic。

Kafka 删除 topic 的命令:

|

1

2

|

kafka-topics.sh –zookeeper localhost:2181 –delete –topic test-topic

|

然而此命令不能真正删除 topic,只是在 zookeeper 的/admin/delete_topics下创建一个临时节点。

Kafka controller在启动的时候会注册对于 Zookeeper 节点/admin/delete_topics的子节点变更监听器,并创建一个单独的线程,执行 topic 删除的操作,监听器捕获到删除时创建的临时节点,立刻触发删除逻辑,查询 test-topic 是否正在被使用,根据其状态决定是否删除。

那么什么时候线程会真正删除此 topic 呢?只有当在server.properties配置了delete.topic.enable=true时并重新启动 Kafka,此 Topic 才会被真正删除。

至此 Topic 的创建和删除原理已经清楚了,而对于在实践过程中遇到的问题也清晰了。