笔者在某次实践过程中,搭建了一个Flink监控程序,监控wikipedia
编辑,对编辑者编辑的字节数进行实时计算,最终把数据sink到kafka的消费者中展示出来,监控程序本身比较简单,只要在程序中指定好WikipediaEditsSource
源并配置好sink与kafka关联就可以,类似一个略微复杂版的wordcount
,按照网络上的教程,在实践的最后,开启zookeeper服务和kafka服务,接着用
kafka-console-producer --topic wiki-result --broker-list localhost:9092
这条命令创建一个名为wiki-result
的topic
,然后运行监控程序,最后用
kafka-console-consumer --bootstrap-server --zookeeper localhost: 9092--topic wiki-result
启动消费者,就可以在终端窗口里观察到源源不断的wikipedia
数据
当笔者第二天再次跑这个监控程序时,发现上次执行的命令
kafka-console-producer --topic wiki-result --broker-list localhost:9092
是生产者命令,然而此例中的生产者实际上是Fink监控程序,那么原作者为何使用kafka-console-producer
命令去创建topic而不是用kafka-topics
命令呢?
kafka-console-producer --topic wiki-result --broker-list localhost:9092
命令是生产者指定topic,是否自动创建了topic呢?
笔者尝试把现有的topic:wiki-result
删掉,然后重新创建topic,提示如下,并没有真正删除,为此笔者去查了下相关资料,将topic创建与删除的原理彻底弄懂了。
在 Kafka 中,Topic 是一个存储消息的逻辑概念,不同的topic在物理上来说是分开存储的,可以有多个producer向他push消息,也可以有多个consumer去pull消息,每个 Topic 可以划分多个分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。每个消息在被添加到分区时,都会被分配一个连续的序列号 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。
通过命令
kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test-topic
创建了1个名为test-topic
的topic,拥有1个分区,每个分区分配2个副本。创建逻辑如图,总的来说就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。
命令行部分比较直白,无非就是一些基本校验,分配副本(尽可能保证分区的副本平均分配到每个broker上),把分配方案持久化到zookeeper的/brokers/topics/
节点下。
后台逻辑部分主要由controller
负责,controller
内部保存了很多信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会创建这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics
目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。TopicChangeListener
监听器一方面会更新controller的缓存信息(比如更新集群当前所有的topic列表以及更新新增topic的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定leader副本及ISR。至此,整个topic的创建就完成了!
除了使用kafka-topics –create
创建topic外,还可以使用kafka-console-producer
发布消息时创建,kafka第一步先获取topic的leader信息,当发现不可用的时候,在去创建此topic。
Kafka 删除topic的命令:
kafka-topics.sh --zookeeper localhost:2181 --delete –topic test-topic
然而此命令不能真正删除topic,只是在zookeeper的/admin/delete_topics
下创建一个临时节点。
Kafka controller
在启动的时候会注册对于Zookeeper节点/admin/delete_topics
的子节点变更监听器,并创建一个单独的线程,执行topic删除的操作,监听器捕获到删除时创建的临时节点,立刻触发删除逻辑,查询test-topic是否正在被使用,根据其状态决定是否删除。
那么什么时候线程会真正删除此topic呢?只有当在server.properties
配置了delete.topic.enable=true
时并重新启动Kafka,此Topic才会被真正删除。
至此Topic的创建和删除原理已经清楚了,而对于在实践过程中遇到的问题也清晰了。