kafka知识点总结

    技术2024-11-14  21

    kafka知识点总结

    一、名词解释

    Kafka 的发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。

    1.Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。 2.Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。 3.Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列 4.Segment:partition物理上由多个segment组成,每个Segment存着message信息 5.Producer : 生产message发送到topic 6.Consumer : 订阅topic消费message, consumer作为一个线程来消费 7.Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。kafka为了保证吞吐量(不需要加锁),只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。

    二、性能为什么好

    1.磁盘io:①kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;②同时为了减少磁盘写入的次数broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数

    2.网络io:①对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;②对于consumer端,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定;③对于kafka broker端,有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/“内核内存”/“进程内存”/“网络缓冲区”,多者之间的数据copy)

    sendfile函数在两个文件描述符之间传递数据(完全在内核中操作),从而避免了内核缓冲区和用户缓冲区之间的数据拷贝,效率很高,被称为零拷贝

    函数定义为:

    ssize_t senfile(int out_fd,int in_fd,off_t* offset,size_t count);

    in_fd参数是待读出内容的文件描述符,out_fd参数是待写入内容的文件描述符。offset参数指定从读入文件流的哪个位置开始读,如果为空,则使用读入文件流默认的起始位置。count参数指定文件描述符in_fd和out_fd之间传输的字节数。

    in_fd必须是一个支持类似mmap函数的文件描述符,即它必须指向真实的文件,不能是socket和管道,而out_fd必须是一个socket 首先我们来看看传统的read/write方式进行socket的传输。 当需要对一个文件进行传输的时候,具体流程细节如下:

    1:调用read函数,文件数据copy到内核缓冲区 2:read函数返回,文件数据从内核缓冲区copy到用户缓冲区 3:write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区 4:数据从socket缓冲区copy到相关协议引擎。 在这个过程中发生了四次copy操作。

    硬盘->内核->用户->socket缓冲区(内核)->协议引擎。

    而sendfile的工作原理呢??

    1、系统调用 sendfile() 通过 DMA 把硬盘数据拷贝到 kernel buffer,然后数据被 kernel 直接拷贝到另外一个与 socket 相关的 kernel buffer。这里没有 用户态和核心态 之间的切换,在内核中直接完成了从一个 buffer 到另一个 buffer 的拷贝。 2、DMA 把数据从 kernel buffer 直接拷贝给协议栈,没有切换,也不需要数据从用户态和核心态,因为数据就在 kernel 里。

    DMA(Direct Memory Access,直接存储器访问) 是所有现代电脑的重要特色,它允许不同速度的硬件装置来沟通,而不需要依赖于 CPU 的大量中断负载。否则,CPU 需要从来源把每一片段的资料复制到暂存器,然后把它们再次写回到新的地方。在这个时间中,CPU 对于其他的工作来说就无法使用。

    三、消息传输一致

    Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

    最少1次(at least once):可能会重传数据,有可能出现数据被重复处理的情况。消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息;

    最多1次(at most once):可能会出现数据丢失情况。消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理);

    恰好1次(exactly once):并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。

    "Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

    最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。

    四、副本

    kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.

    五、消息存储文件的格式

    每个log entry格式为"4个字节的数字N表示消息的长度" + “N个字节的消息内容”;每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置…每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

    获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可。

    producer发message到某个topic,message会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),kafka broker收到message往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

    每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。

    segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件. segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。 每个segment中存储很多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

    下图对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下:

    索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中 元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移 地址为497

    六、如何利用zookeeper

    kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

    Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

    Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

    Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

    Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

    Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

    Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)

    当consumer启动时,所触发的操作:

    A) 首先进行"Consumer id Registry";

    B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

    C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

    总结:

    Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.

    Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.

    Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。

    七、主从切换

    Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

    如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。

    Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

    一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。

    实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:

    等待ISR中的任何一个节点恢复并担任leader。

    选择所有节点中(不只是ISR)第一个恢复的节点作为leader.

    这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。

    这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

    Processed: 0.009, SQL: 9