05 Confluent

    技术2023-08-02  77

    文章目录

    CHAPTER 5 Kafka Internals kafka内部原理Cluster MembershipThe Controller 控制器Replication 副本复制Finding the Preferred Leaders 找到合适的leader Request Processing 请求过程Produce Requests 生产者请求Fetch Requests 获取消息请求Other Requests 其他请求 Physical Storage 物理存储Partition Allocation 分区分配Mind the Disk Space 注意磁盘空间 File Management 文件管理File Format 文件格式Indexes 索引Compaction 压缩How Compaction Works 压缩如何工作Deleted Events 删除事件When Are Topics Compacted? 何时开始compacted Summary 概要

    CHAPTER 5 Kafka Internals kafka内部原理

    为了在生产环境中运行kafka或者编写使用它的应用程序,并不一定要理解kafka的内部原理。然而,理解kafka的工作原理,有助于故障排查,理解kafka的工作行为。具体代码实现细节本书不做深入描述,但是,kafka有关的从业人员,必须关注如下三个内容:

    kafka的副本机制是如何工作的kafka如何处理来自生产者和消费者的请求kafka的数据存储,如文件格式和索引 深入的理解这些原理将对kafka的性能调优非常有帮助。这将有助于更精确的控制而不是随机摆弄。

    Cluster Membership

    kafka使用Apache Zookeeper来维护当前的集群成员broker列表。每个broker都有一个唯一的表示符,该标识符可以在broker配置的文件中设置,也可以自动生成。每次broker启动的时候,它都会通过创建一个临时节点,在zookeeper中使用其ID注册自己。不同的是,kafka的broker组件订阅zookeeper中的/brokers/ids路径,broker在这个路径上注册。在添加和删除的时候会收到通知。 如果尝试启动具有相同id的另外一个broker,将会出现一个错误,新的broker已经注册。这会导致启动失败。因为我们已经为相同的id创建了一个zookeeper节点。 当broker失去与zookeeper的连接时,(通常是由于broker停止,但也可能是由于网络故障或者长时间GC导致暂停),broker启动时创建的临时节点将自动从zookeeper中移除,监视broker列表的kafka组件将收到broker消失的通知。 即使broker停止时表示broker的临时节点消失,broker的ID依然存在于其他数据结构中。例如,每个topic的副本列表(Replication)包含副本的broke ID,这时,如果你完全丢失一个broker,并且使用旧的broker的ID启动一个全新的broker,它将立即加入集群,以替换丢失的broker,并为其分配相同的分区和topic。

    The Controller 控制器

    控制器是kafka的broker之一,除了通用的broker功能外,它还负责对分区选举(我们将在下一节讨论)。通过zookeeper中创建一个临时节点/controller,集群中启动的第一个broker成为控制器。当其他broker启动的时候,他们也尝试创建节点,但是收到一个“node already exists”异常,这会使这些broker意识到控制节点已经存在,并且在broker中已经有了一个控制器。broker在控制节点上创建一个zookeeper的监视,这样旧可以通知它们该节点的更改。通过这种方式,我们可以保证集群一次只有一个控制器。(通过zookeeper实现) 当控制器broker停止或者失去与zookeeper的连接时,临时节点消失。属性将通过zookeeper通知集群中其他的broker控制节点已经消失,可以在zookeeper上创建一个新的控制器节点。第一个收到通知的节点将创建一个新的控制器。当其他的borker也试图创建新的控制节点时,将会收到“node already exists”异常。每当一个控制节点被选举出来,它就会收到一个更高的控制器epoch数字,这个数字通过zookeeper增加。broker知道当前的控制器epoch,如果它们从一个旧的控制器接收到了一个比较旧的数字,则会主动忽略这个旧的控制器。 当控制器注意到一个broker离开集群时,(通过监控zookeeper上的node路径)它知道broker上有leader的所有分区都需要一个新的leader。它检查需要新的leader的所有分区,缺定应该由谁来担任新的leader(即该区的副本列表中的下一个副本)并向包含这些分区的leader或者现有的followers的所有broker发送请求。该请求包含关于分区的新的leader和followers信息。每一个leader都需要知道开始为客户的生产者和消费者请求服务。而followers都知道它们需要开始复制来自新leader的消息。 当一个控制器注意到一个新的broker加入集群时,它使用brokerID检查该broker上是否存在副本。如果存在,控制器将向新的broker和现有的broker通知更改,新borker上的副本开始复制来自现有的leader的消息。 总而言之,kafka使用zookeeper的临时节点特性来选择控制器,并在节点如何和离开集群时通知控制器。每当控制器注意到节点加入和离开集群时,它负责在分区和副本之间选择leader,控制器使用epoch数字来防止脑裂,避免两个节点都认为它们各自都是控制器。

    Replication 副本复制

    复制是kafka的架构核心,kafka文档中的第一句话就将其描述为分布式、分区、复制的提交日志服务器。复制非常重要,因为单个节点不可避免出现故障,而kafka通过这种方式保证了可用性和持久性。 正如我们已经讨论过的,Kafka中的数据每个都是按topic组织的,每个topic都是分区的,每个分区都是可以有多个副本。把这些副本存储在broker上,每个broker通常存储属于不同topic的分区的数百甚至数千个副本。 有两种类型的副本:

    Leader replica 每个分区都有一个指定为leader的副本,为了保证一致性,所有的生产者和消费者的请求都要经过leader副本。Follower replica 一个分区的所有非leader副本都称为follower副本。follower副本部位客户请求服务,它们唯一的工作就是复制来自leader的信息,并且及时了解leader掌握的最新信息,在分区的leader的副本崩溃的情况下,一个follower副本将被提升为该分区的新的leader。

    leader负责的另外一个任务就是知道哪些follower副本是与leader同步的。follower副本试图保持更新通过复制所有leader收到的消息,但是它们由于各自原因不能保持同步,诸如网络拥塞时减慢复制或者当broker崩溃时所有副本broker开始落后,直到我们启动broker能够开始复制。 为了与leader保持同步,副本发送leader Fetch请求,与消费者为了消费消息的请求完全相同。 为了响应这些请求,leader将消息发送给副本。哪些fetch请求包含副本接下来想要接收的消息的offset。并且始终都是有序的。 一个副本将请求消息1,之后是消息2、消息3。在它获得之前的所有消息之前,它不会请求消息4。这意味着leader可以知道当副本请求消息4的时候,副本已经获得了消息3之前的所有消息,通过查看每个副本请求的最后offset,leader可以知道每个副本后面还有多远,还有多少数据没有同步。如果要给副本在超过10秒没有请求消息,或者如果它已经请求消息,但是在超过10秒内没有赶上最近的消息,则认为该副本不同步。如果一个副本跟不上leader,当leader故障之后,它就不能成为新的leader,毕竟,它没有包含全部的消息。 与此相反的是,不断请求最新消息的副本称为同步副本(in-sync replicas. ISR ),只有同步副本在现有leader失败之后才有资格被选为分区leader。 在被认为不同步之前,一个follpwer可以初余不活动或者落后的时间是由副本的配置参数replica.lag.time.max.ms决定的。这使得延迟对leader选举期间客户的行为和数据保留有影响。我们将在第6章可靠性保证时对此进行深入讨论。 除了当前的leader之外,每个分区都有一个首选的leader,最初在创建topic时的leader副本,它是首选的,因为在第一次创建分区的时候,leader要在broker之间进行平衡(稍后将介绍broker之间分配副本和领导者的算法)。因此,我们期望当首选的leader确实是集群中所有分区的leader时,负载将在broker之间保持平衡。默认情况下,kafka配置的auto.leader.rebalance.enable=true,它将检查首选leader副本是否不是当前领导者,但是同步的,并触发leader选举。使首选leader成为当前的领导者。

    Finding the Preferred Leaders 找到合适的leader

    确定当前首选leader副本的最佳办法就是查看一个分区的副本列表,你可以通过kafka-topic.sh工具的输出中查看到分区副本的详细信息,列表的第一个分区就是首选分区。我们将在第十章讨论所有的工具集。无论谁是当前的leader,即使使用副本重新分配工具将副本重新分配到不同的broker,这都是正确的。事实上,如果你手动重新分配副本,重要的是你要记住,你首先指定的将变成首选分区。所以,确保把这些分配给不同的broker,以避免在其他broker没有完成它们工作的时候,一些broker和leader会负载过大。

    Request Processing 请求过程

    kafka的broker的大部分工作,是处理从客户机、分区副本和控制器发送到分区leader的请求。kafka有一个二进制协议,它指定请求的格式以及代理如何响应,请求成功处理或broker在处理请求时遇到的错误。broker总是启动连接并发送请求,broker处理请求并响应它们。从特定客户机发送到broker所有请求都按接收它们的顺序进行处理,正是这种保证机制,使得kafka能够做为消息队列运行,并为其存储的消息提供排序保证。 所有的请求有一个标准的消息头,包括:

    请求类型(API密钥)请求版本(这样broker可以处理不同版本的客户机的响应)相关ID,唯一标识请求的数字,也出现在响应的错误日志中,ID用于故障排除。客户端ID,用于标识发送请求的应用程序。 我们不会在这里描述协议,因为它在kafka文档只有非常详细的描述。但是,了解一下broker如何处理i请求是很有帮助的-稍后,我们讨论如何监视kafka各种配置选项时,你就会指定指标和配置参数指的是哪些队列和线程。 对于broker监听每个端口,broker运行一个acceptor线程,该线程创建连接并将其移交给处理器线程进行处理。线程的数量(也称为网络线程)也是可配置的。网络线程负责从客户端连接获取请求,将它们放在请求队列中,从响应队列获取响应并将它们发送回客户端。参见如下图: 生成请求和获取请求都必须发送到分区的leader的副本,如果broker接收到特定分区生成的请求,并且该分区的leader位于另外一个broker上,那么发送生成请求的客户机将得到一个错误的响应,即"not a leader parttion"。如果某个特定分区的获取请求到达了没有该分区的leader的broker,也会发生同样的错误。kafka的客户端负责向包含相关分区leader的broker发送生成和获取请求,broker包含请求的相关分区leader。 broker如何知道将请求发送到哪里,kafka客户端使用的了另一种称为元数据请求的请求类型。它包括客户机感兴趣的topic列表。服务器响应指定topic中存在哪些分区,每个分区的副本以及哪个副本的leader。元数据请求可以发送到任何broker,因为所有broker都有包含此信息的元数据缓存。 客户端通常缓存此信息,并使用它将生成和获取请求直接发送到每个分区的正确broker。偶尔也需要刷新此信息,(刷新时间间隔由metadata.max.age.ms配置),通过发送另一个元数据请求,它们可以知道topic元数据是否发生了变化。例如,是否添加一个新的broker或将一些副本移动到一个新的broker。另外,如果客户端收到一个请求的"not a leader"错误,它将在尝试再次发送请求之前刷新他的元数据,因为错误表明客户端正在使用过时的信息并正在向错误的broker发送请求。

    Produce Requests 生产者请求

    正如我们在第三章中看到的,一个名为acks的配置参数是需要我们在消息被认为是成功写入之前确认收到消息的broker数量。生产者雷配置为当消息仅被领导者(acks=1)接收,所有同步副本时,将消息视为“written uccessfully”(ack=all),或者消息在不等待broker接收的情况下发送的时刻。(acks=0)。当包含分区的主副本的broker收到该分区生成请求时,他将首选运行几个验证:

    发送数据的用户对topic有写权限吗?请求中指定的ack数是否有效(只允许0、1和all)

    如果acks设置为all,是否有足够的同步副本客户写入消息?如果同步副本的数量低于可配置的数量,可以配置broker来拒绝新消息,我们将在第6章,我们讨论kafka的可靠性保证。

    然后将新消息写入本地磁盘,在linux上,消息被写到文件系统缓存中,不能保证什么时候写到磁盘。kafka不等待数据被持久化到磁盘上,它依靠复制老保持消息的持久性。

    一旦消息被写入到分区的leader,birker检查ack配置,如果ack被设置为0或者1,broker将立即响应。如果acks设置为all,则将请求被存储在要给名为purgatory的缓冲区,直到leader发现follower副本复制了消息,然后向客户端发送响应。

    Fetch Requests 获取消息请求

    broker处理消费者获取数据请求的方式与处理生产者发送生产请求的方式非常相似。客户端发送一个请求,请求,包括broker从topic,分区和offset。类似于,请向我发送从topic TEST的分区0 offset为53开始的消息和从topic TEST 分区3 offset为64开始的消息。 客户端还指定broker可以为每个分区返回多少数据的限制。这个限制很重要,因为客户端需要分配内存来保存从broker发回的响应。如果没有这个限制,broker返回的响应可能过大导致客户端内存耗尽。 正如我们之前讨论的,请求必须到达请求中指定的分区leader,客户端发送必要的元数据的请求,以确保正确的路由fetch请求。当leader收到请求时,它首先检查请求是否有效,这个特定的分区的offset是否存在?如果客户端请求的消息太旧,以至于分区中已经将这些数据删除。或者请求的offset还不存在,broker将响应一个错误。 如果offset存在,broker将从分区读取消息,直到客户端在请求中设置的上限,然后将消息发送给客户端。众所周知,kafka使用 zero-copy方法将消息发送给客户端。这意味着kafka将消息从文件或者更可能的是linux的文件系统缓存,直接发送到网络通道,而不需要任何中间缓存。这与大多数数据库不同,这些数据库中,数据在响应给客户端之前,存储在本地缓存中。这种技术消除了在内存中复制字节和管理缓冲区的开销,从而大大提高了性能。 除了设置broker可以返回的数据上限之外,客户端还可以设置返回数据的下限。例如,将下限设置为10k是客户端告诉broker,只有至少有10k字节要响应时才返回结果的方法。当客户端读取的topic没有太多流量的时候,这是降低CPU和网络利用率的好办法。 替换客户端以每毫秒的时间周期去请求数据,得到很少或者没有数据。在客户端发送一个请求,broker等待,直到有足够数量的数据并返回数据,然后客户端再继续发送下一次请求获取更多的数据。读取的数据总量相同,但是返回的数据的次数更少,因此系统开销尤其是网络开销会更低。 当然,我们不希望客户端永远等待broker提供自购的数据,过一段时间滞后,就可以直接获取已存在的数据并进行处理,而不是等待更多的数据。因此客户端还可以定义一个超时时间来告诉broker,如果你没有足够的数据,那么当达到x毫秒之后,就进行响应。 需要注意的是,并不是所有leader上的数据都可以让客户端读取。大多数客户端智能读取写入到所有同步副本的消息,(follower副本,即使它们是使用者,也不受此影响,否则复制无法工作)。我们已经讨论过,分区的leader直到哪些消息被复制到了哪个副本,直到一条消息被写入所有的同步副本,它才会被发送给消费者。试图获取这些消息将导致一个空响应,而不是一个错误。 这种行为的原因是没有复制到足够副本的消息被认为是不安全的,如果leader奔溃,另外一个副本取代了它,则这些消息将会丢失。如果我们允许客户端读取只存在于leader的消息,我们就会看到数据不一致的行为。例如,如果消费者读取了一条消息,leader崩溃了并且没有其他的broker有此条消息的话,则该消息会丢失。使用其他消费者将无法读取此消息,这可能导致与已读此消息的使用者不一致,相反,我们等待直到所有同步副本获得此消息,然后才允许消费者读取它。这种行为还意味着,如果由于某种原因,broker之间的复制速度比较慢,那么新消息到达消费者的时间将花费更长,因为我们首先要等待消息复制。此延迟仅限于replica.lag.time.max.ms的配置,副本在复制新消息的延迟时间,在这个时间之内该副本任然被认为是同步的。

    Other Requests 其他请求

    我们刚刚讨论了kafka的客户端使用最常见的请求类型,元数据,生产和获取。需要重点说明的是,我们讨论的是在网络上客户端使用的通用二进制协议。 kafka包括由apache贡献者实现的维护java客户端,也有其他语言的客户端,包括C、python、GO等。你可以在apache网站上看到完整列表,它们都使用了此协议与kafka的broker通信。 此外,同样的协议用于kafka的broker本身之间的通信,这些请求时内部的,不应该被客户端使用。例如,当一个控制器宣布一个分区有一个新的leader的时候,他会发送一个leader的AndIsr请求。这样leader就会直到开始接收客户端的请求,和follower就会直到它们新的leader。 kafka协议目前能处理20种不同类型的请求,以后还会添加更多,协议在不断发展,随着我们增加更多的客户端功能,我们需要发展协议来匹配。例如在过去kafka的消费者使用apache zookeeper来跟踪它们从kafka中收到的补偿。因此,当消费者启动的时候,可以检查zookeeper从分区读取的最后一个offset,并直到从哪里开始处理。由于各种原因,我们决定停止使用zookeeper来存储这些。而是将offset存储到特定的分区中。为了做到这一点,我们必须向协议中添加几个请求,OffsetCommitRequest,OffsetFetchRequest,和ListOffsetRequest。限制,当应用程序调用commitOffset()和客户端API的时候,客户端不再写消息到zookeeper中,相反,它发送offsetCommitRequest到kafka。 topic创建的命令行工具任然是在zookeeper中添加数据的,broker通过监控zookeeper中的列表知道哪些新的topic被添加。我们正在改进kafka,添加Create TopicRequest,这将允许所有的客户端通过直接访问kafka来创建topic。(对哪些没有zookeeper库的语言也能很好的支持)。 除了通过添加新的请求类型来演进协议之外,我们有时还选择修改现有的请求来添加一些功能。例如,在0.9.0.0和0.10.0.0,我们决定在元数据响应中添加信息,让客户端知道当前的控制器是谁。因此,我们向元数据请求和响应添加了一个新版本,现在,0.9.0.0的客户端发送版本0的元数据请求(因为版本1在0.9.0.0中不存在),而broker无论是0.9.0.0还是0.10.0.0都知道如果进行响应,版本0不会返回控制信息。这非常好,因为0.9.0.0不期望控制器信息,它不知道如何解析。但是如果你有0.10.0.0的客户端,它将发送一个版本1的元数据请求,而0.10.0.0的broker则将使用包含控制器信息的版本1进行响应,0.10.0.0的客户端可以使用。如果0.10.0.0向0.9.0.0的broker发送版本1的元数据请求,broker将不知道如何处理更新版本的请求,并将响应错误。这就是我们建议在升级客户端之前升级broker的原因,新的broker知道如何处理旧的请求,但反之则不然。 在版本0.10.0.0中,我们添加了ApiVersionRequest,它允许客户端询问broker支持每个请求哪个版本,并相应地使用正确版本。正确的使用此新功能的客户端将能够通过使用它们所连接的broker所支持的协议版本与旧的broker进行通信。

    Physical Storage 物理存储

    kafka的基本存储单元是一个分区副本,分区不能在多个broker之间分隔,甚至不能在同一broker上的多个磁盘进行分隔。因此,分区的大小受到单个挂载点上的可用空间的限制,如果使用JBOD配置,挂载点将包含单个磁盘,如果使用RAID,则可以包含多个磁盘。参见第二章。 在配置kafka的时候,管理有定义将存储分区的目录列表,这就是日志。dirs参数不需要与kafka存储的错误日志的配置为止混淆,错误日志是在log4j中进行配置的。通常的配置包括kafka将使用每个挂载点的目录。 让我们看看kafka如何使用可用目录来存储数据。首先,我们想了解如何将数据分配给集群中的broker和broker中的目录。然后我们将了解broker如何管理文件,特别是如何处理保留保证。然后,我们将深入文件查看文件和索引的各种,最后,我们将介绍日志压缩,允许将kafka转换为长期数据存储的高级特性。并描述它是如何工作的。

    Partition Allocation 分区分配

    创建kafka topic的时候,kafka首先决定如何在broker之间分配分区。假设你有6个broker。并决定创建一个具有10个分区和副本因子为3的topic。kafka现在有30个分区副本可以分配给6个broker。当进行分配时,目标是:

    为了在多个broker之间均匀分布副本,在我们的示例中,我们将每个broker分配5个副本。为了确保每个分区的每个副本都位于不同的broker上。如果分区0在broker上有2个leader,我们可以在broker3和broker4上分配follower,但是不能在broker2和broker3同时分配follower。如果broker有机架信息,可以在kafka版本0.10.0.0之后的版本中获得,则尽可能将每个分区的副本都分配到不同的机架。这确保了一旦整个机架都停机的事件不会导致分区完全不可用。

    为此,我们从一个随机的broker比如说4开始,以循环方式为每个broker分配分区,以确定leader的位置。因此,分区leader0将位于broker4上,分区1的leader将位于broker5上,分区2将位于broker0上,因为我们有6个broker,以此类推。然后,对于每个分区,我们将副本与leader的offset增加。如果分区的leader0在broker4上,第一个follower将在broker5上,第二个follower将在broker0上,分区1的leader位于broker5上,因此第一个副本位于broker0上,第二个副本位于broker1上。 当考虑到机架感知时,我们将准备一个机架交替的broker列表,而不是按照数字顺序选择broker。假设我们知道broker为0,1和2,我们不是按照0到5的顺序选择broker。而是按照0,3,1,4,2,5的顺序排列。每个broker的后面跟着一个来自不同机架的broker,如下图。在这种情况下,如果分区0的leader在broker4上,那么第一个副本将在broker2上,它在一个完全不同的机架上,会很好的避免一旦机架1离线,我们知道任然有一个可用的副本。因此分区任然可用。这对于我们所有的副本都是正确的,我们保证了在机架失败时候的可用性。

    Mind the Disk Space 注意磁盘空间

    需要注意,向broker分配分区时如果没有考虑可用的空间或者现有负载,而向磁盘分配分区时考虑分区的数量,而不是分区的大小。这意味着,如果某些broker比其他broker拥有更多的磁盘空间(可能因为集群混合了较老的和比较新的服务器),一些分区非常大,或者同一个broker上有不同大小的磁盘,那么你需要特别小心你的分区分配。

    File Management 文件管理

    在kafka中,文件的留存是一个重要的概念。kafka不会永远保留数据,也不会等到所有用户读取之后才将文件删除。相反,kafka的管理员会为每个topic分配一个保留期,在删除消息之前存储消息的事件,或者在清除旧消息之前存储多少数据。 因为在大文件中查找需要清楚的消息并删除文件的一部分即费时又容易出错,所以我们将每个分区分隔成多个段,默认情况下,每个段要么包含1G的数据,要么包含一周的数据。kafka的broker写入一个分区时,如果达到了段的限制,我们将关闭文件并开始一个新的分区。 当前写入的段称为活动段,活动是永远不会删除的,所以日过你将日志设置为只存储一天的数据,但是每个段包含了5天的数据,你实际上会保留5天的数据,因为在关闭之前我们不会删除活动段。如果你选择的存储一个星期的数据并每天滚动一个新的段,你将看到我们将滚动一个新段,同时删除最老的段,因为大多数时候,分区将有7个段。 正如你在第二章中了解到的,kafka的broker将为每个分区中的每个段保留一个打开的文件句柄。甚至是不活动段。这通常会导致打开的文件句柄数量过多,因此操作系统必须相应的进行调优。

    File Format 文件格式

    每个段存储在单个数据文件中,在文件中,我们存储了kafka消息和它们的offset。磁盘上数据的格式与我们从生产者发送给broker以及稍后从broker发送给消费者的格式相同。在磁盘上和网络上使用相同的格式使得kafka能够在发送消息给消费者时使用zero-copy优化。并且避免对生产者已经压缩的消息进行解压缩和再压缩。 除了key,value和offset之外,每个消息还包含消息的大小,允许我们检测错误的校验和代码,吧iOS消息格式版本的魔法字节,压缩编码器(Snappy、GZIP、或者LZ4)和事件戳(在0.10.0版本中添加)。事件戳在消息发送时由生成器提供,或者在消息到达时由broker提供–具体取决于配置。 如果生产者正在发送压缩消息,那么单个生产者批处理中的所有消息将被压缩在一起,并作为包装器消息的值发送。如下图,因此,broker接收单个消息,并将其发送给消费者。但是当消费者解压缩消息时,它将看到批处理中包含的所有消息,以及它们自己的时间戳和offset。 这意味着如果你在生成器上使用压缩,(极力推荐)发送更大批次的消息能降低网络和磁盘开销。这也意味着,如果我们决定改变消费者使用的消息格式,如添加要给时间戳消息,那么协议和磁盘存储的格式都需要修改。kafka broker需要知道如何处理这个包含两种格式的变更。 kafka的broker与dumpLogSegment工具一起提供,它允许你查看文件系统中的分区段并检查其内容。它将显示每条消息的offset,校验和,魔法字节,大小和压缩编解码器。你可以如下运行该工具:

    bin/kafka-run-class.sh kafka.tools.DumpLogSegments

    如果你选择 deep iteration参数,它将显示包装器消息中压缩的消息的信息。

    Indexes 索引

    Kafka允许消费者开始从任何可用的偏移量获取消息,这意味着,如果消费者请求从offset100开始的1MB消息,broker必须能够快速定位offset为100的消息,(该消息可能在分区中的任何段中),并从该offset开始读取消息。为了帮助broker快速定位给定offset的消息。kafka为每个分区为维护了要给索引。索引将offset映射到段文件和文件中的位置。 索引也被分割成段,因此我们可以在清楚消息时删除旧索引,kafka不试图维护索引的校验和。如果索引损坏,只需要重新读取消息并记录offset的位置,就可以从匹配的日志段重新生成索引。管理员在必要时删除索引也是完全安全的,它可以自动生成。

    Compaction 压缩

    通常,kafka将存储一段时间的消息,并清楚比保留期更早的消息,但是,假设你的kafka为客户存储的送货地址。在这种情况下,存储每个客户的最后一个地址比存储上周或者去年的数据更有意义。这样你就不必担心旧地址,并且你仍然保留了一段时间没有搬家的客户的地址。另外一个用例可以是使用kafka存储其当前状态的应用程序。每次状态改变时,应用程序都会将新状态写入kafka。当从崩溃中恢复时,应用程序从kafka读取这些消息来恢复它的最新状态。在这种情况下,它只关心崩溃前的最新状态,而不是运行时发生的所有更改。 kafka支持这样的用例,它允许topic上保留策略为delete(删除比保留时间更早的消息)。而compact(只存储topic每个key的最新值)。显然,只有应用程序为topic生成既包含key又包含value的事件时,将策略设置为compact才有意义。如果topic包含空的key,压缩将失败。

    How Compaction Works 压缩如何工作

    每个日志都被分为两部分:

    Clean 以前压缩过的消息,此部分仅为每个key包含一个值,即之前的压缩时的最新值。Dirty 在上次压缩之后编写的消息。 当kafka启动的时候,是否启用了压缩,(使用log.cleaner.enabled配置),每个broker将启动要给压缩管理器线程和许多压缩线程。它们负责执行压缩的任务。每个线程选择的脏消息占总分区大小比例最高的分区,并清除该分区。 为了压缩分区,清理线程读取分区的dirty部分,并创建一个内存的map,每个map的调没由消息key的16字节hash后具有相同key的前一条消息的8字节的offset组成。这意味着每个映射条目只使用24字节。如果我们看到一个1GB的段和假设每个消息段占用1KB。分区将包含100万个这样的消息,我们只需要将一个24MB的map压缩到这个段(我们可能需要很多key重复的消息,如果我们将重用相同hash的话通常会降低内存的使用)这是相当有效的。 在配置kafka的时候,管理员配置线程可以为这个offset的map使用多少内存压缩,尽管每个线程都有自己map,但是配置时针对的所有的线程的总内存。如果你为压缩的offset map配置了1GB内存,并且有5个clean状态的线程,那么每个线程将获得200MB为自己的offset map。kafka不要钱分区的整个dirty部分都适合这个map分配的内存大小,但是至少要适配要给完整的段。如果没有,kafka将记录要给错误。管理员将为这个offset分配更多的内存,或者使用更少的clen线程。如果只有几个段适合,kafka将开始压缩最老的分段到map,其余的仍然时dirty,等待下一次压缩。 清理线程构建offset的map之后,它将从最老的段开始读取clean的段,并根据offset的map检查它们的内容,对于每个消息,它检查该消息的key是否存在offset的map中,如果该key在映射中不存在,那么我们刚刚读取的消息的值仍然是最新的,我们将该消息复制到替换段。如果该key确实存在于map中,如果忽略该消息,因为分区中稍后有一条相同key但是比较新的消息。一旦我们复制了所有任然包含其key的最新值的消息,我们可以将原始的段替换,在这个过程中,每个key只剩下一条消息,具有最新值的消息。如下图:

    Deleted Events 删除事件

    如果我们总是保存每个key的最新消息,那么当我们真的想删除某个特点key所持有的消息的时候,比如如果一个用户离开了我们的服务,而我们有义务从系统中删除该用户的所有痕迹,那么我们该如何处理? 为了从系统中完全删除一个key,甚至不保存最后一条消息,应用程序必须生成一条包含该消息的key和空值的消息。当cleaner线程发现这也的消息的时候,它首先进行常规的压缩,只保留空值消息。它将保留这个特殊的消息(墓碑)一段可配置的时间。在此期间,消费者能够看到此消息并知道该值被删除。因此如果消费者将数据从kafka复制到数据库,它将看到墓碑消息,并且知道将用户从数据库中删除。在设置时间之后,cleaner的线程将删除墓碑消息,key将从kafka的分区中消失。给消费者足够的时间看到墓碑消息是很重要的,因为如果我们的消费者错过了墓碑消息,它会看不到消费时的关键信息,因此不知道从kafka或者数据库中将其删除。

    When Are Topics Compacted? 何时开始compacted

    就像删除策略永远不会删除当前的活动段一样,压缩策略也永远不会压缩活动段。只能在非活动段上压缩消息。 在0.10.0之前的版本,当50%的topic包含dirty记录时,kafka将开始压缩。目标是不要过于压缩(因为压缩会影响topic的读/写性能),单也不要留下太多dirty记录(因为会消耗磁盘空间)。当磁盘空间达到50%以上,将通过一个topic一次性压缩似乎是一个合理的选择。这个配置可以通过管理员进行调优。在未来的版本中,我们计划增加一个宽期限,在此期间我们保证消息将保持在未压缩状态。这将允许需要查看写入topic的每条消息的应用程序有足够的时间确保它们确实看到了这些消息,即便它们有些滞后。

    Summary 概要

    对于kafka更深层次的原理显然超出了本章的范围,但是我们希望本章的内容能让你体验到我们在项目中所设计的决策何优化,并可能解释你在使用kafka时遇到的一些更模糊的行为何配置。 如果你真的对kafka内部感兴趣,那么最好的办法就是阅读源码。kafka开发人员邮件列表是dev@kafka.apache.org.这是要给非常友好的社区,总是有人愿意回答关于kafka是如何工作的问题。当你阅读源码的时候,或许可以修复一个或者两个bug。开源项目总是欢迎你的贡献。

    Processed: 0.009, SQL: 9