首页 网维知识库 由Flink与Kafka实践探究Kafka的两个问题

由Flink与Kafka实践探究Kafka的两个问题

笔者在某次实践过程中,搭建了一个Flink监控程序,监控wikipedia编辑,对编辑者编辑的字节数进行实时计算,最终把数据sink到kafka的消费者中展示出来,监控程序本身比较…

笔者在某次实践过程中,搭建了一个Flink监控程序,监控wikipedia编辑,对编辑者编辑的字节数进行实时计算,最终把数据sink到kafka的消费者中展示出来,监控程序本身比较简单,只要在程序中指定好WikipediaEditsSource源并配置好sink与kafka关联就可以,类似一个略微复杂版的wordcount,按照网络上的教程,在实践的最后,开启zookeeper服务和kafka服务,接着用

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

这条命令创建一个名为wiki-resulttopic,然后运行监控程序,最后用

kafka-console-consumer --bootstrap-server --zookeeper localhost: 9092--topic wiki-result

启动消费者,就可以在终端窗口里观察到源源不断的wikipedia数据

由Flink与Kafka实践探究Kafka的两个问题插图

当笔者第二天再次跑这个监控程序时,发现上次执行的命令

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

是生产者命令,然而此例中的生产者实际上是Fink监控程序,那么原作者为何使用kafka-console-producer命令去创建topic而不是用kafka-topics命令呢?

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

命令是生产者指定topic,是否自动创建了topic呢?

笔者尝试把现有的topic:wiki-result删掉,然后重新创建topic,提示如下,并没有真正删除,为此笔者去查了下相关资料,将topic创建与删除的原理彻底弄懂了。

由Flink与Kafka实践探究Kafka的两个问题插图1

在 Kafka 中,Topic 是一个存储消息的逻辑概念,不同的topic在物理上来说是分开存储的,可以有多个producer向他push消息,也可以有多个consumer去pull消息,每个 Topic 可以划分多个分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。每个消息在被添加到分区时,都会被分配一个连续的序列号 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。

由Flink与Kafka实践探究Kafka的两个问题插图2

通过命令

kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test-topic

创建了1个名为test-topic的topic,拥有1个分区,每个分区分配2个副本。创建逻辑如图,总的来说就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。

由Flink与Kafka实践探究Kafka的两个问题插图3

命令行部分比较直白,无非就是一些基本校验,分配副本(尽可能保证分区的副本平均分配到每个broker上),把分配方案持久化到zookeeper的/brokers/topics/节点下。

后台逻辑部分主要由controller负责,controller内部保存了很多信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会创建这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。TopicChangeListener监听器一方面会更新controller的缓存信息(比如更新集群当前所有的topic列表以及更新新增topic的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定leader副本及ISR。至此,整个topic的创建就完成了!

除了使用kafka-topics –create创建topic外,还可以使用kafka-console-producer发布消息时创建,kafka第一步先获取topic的leader信息,当发现不可用的时候,在去创建此topic。

Kafka 删除topic的命令:

kafka-topics.sh --zookeeper localhost:2181 --delete –topic test-topic

然而此命令不能真正删除topic,只是在zookeeper的/admin/delete_topics下创建一个临时节点。

Kafka controller在启动的时候会注册对于Zookeeper节点/admin/delete_topics的子节点变更监听器,并创建一个单独的线程,执行topic删除的操作,监听器捕获到删除时创建的临时节点,立刻触发删除逻辑,查询test-topic是否正在被使用,根据其状态决定是否删除。

那么什么时候线程会真正删除此topic呢?只有当在server.properties配置了delete.topic.enable=true时并重新启动Kafka,此Topic才会被真正删除。

至此Topic的创建和删除原理已经清楚了,而对于在实践过程中遇到的问题也清晰了。

免责声明:文章内容不代表本站立场,本站不对其内容的真实性、完整性、准确性给予任何担保、暗示和承诺,仅供读者参考,文章版权归原作者所有。如本文内容影响到您的合法权益(内容、图片等),请及时联系本站,我们会及时删除处理。

作者: 3182235786a

为您推荐

windows8

windows8

Windows 8 是微软公司于 2012 年推出的一款操作系统,因其独特的界面设计和功能受到广泛关注。本文将从 Win...
Windows 下载指南:获取最新版本的 Windows 操作系统

Windows 下载指南:获取最新版本的 Windows 操作系统

作为全球最受欢迎的操作系统之一,Windows 提供了丰富的功能和用户友好的界面。如果您想获取最新版本的 Windows...
windows资源管理器已停止工作

windows资源管理器已停止工作

Windows 资源管理器已停止工作是 Windows 操作系统中常见的一个问题,通常表现为资源管理器窗口无法正常打开或...
Windows 10 激活方法详解:轻松激活您的操作系统

Windows 10 激活方法详解:轻松激活您的操作系统

购买了全新的Windows 10操作系统后,如何激活它成为许多用户关注的问题。本文将为您详细介绍Windows 10的激...
windows10激活工具

windows10激活工具

Windows 10 激活工具是一款用于激活 Windows 10 操作系统的软件。通过使用激活工具,用户可以轻松地激活...

发表回复

返回顶部