06 Confluent

    技术2025-09-01  4

    文章目录

    CHAPTER 6 Reliable Data Delivery 数据传输可靠性Reliability Guarantees 可靠性保证Replication 复制Out-of-Sync Replicas不同步的副本 Broker Configuration broker配置Replication Factor 副本因子Unclean Leader Election 不干净的leader选举Minimum In-Sync Replicas 最低同步副本 Using Producers in a Reliable System 生产者可靠性Send Acknowledgments 发送ackConfiguring Producer Retries 配置生产者重试Additional Error Handling 添加错误处理 Using Consumers in a Reliable System 消费者的可靠性Committed Messages Versus Commited Offsets 已提交的消息和offset Important Consumer Configuration Properties for Reliable Processing 用于可靠性的重要配置参数Explicitly Committing Offsets in Consumers 精确的commit 消费者的offsetAlways commit offsets after events were processed 始终在处理消息后提交offsetCommit frequency is a trade-off between performance and number of duplicates in the eventMake sure you know exactly what offsets you are committing 确保你清楚精确的commit offset是什么Rebalances 重平衡Consumers may need to retry 消费者可能也需要重试Consumers may need to maintain state 消费者需要维护状态Handling long processing times 处理较长的处理时间Exactly-once delivery 精确一次传递 Validating System Reliability 系统可靠性验证Validating Configuration 配置验证Validating Applications 验证应用程序Monitoring Reliability in Production生产环境可靠性监控 Summary 总结

    CHAPTER 6 Reliable Data Delivery 数据传输可靠性

    可靠的数据传输是系统的属性之一,不能在事后考虑,就像性能一样,它必须从最初的白板图设计成一个系统,你不能事后把系统抛在一边。更重要的是,可靠性是系统的属性,而不是单个组件的属性,因此即使在讨论apache kafka的可靠性保证时,也需要考虑其各种场景。当谈到可靠性的时候,与kafka集成的系统和kafka本身一样重要。因为可靠性是一个系统问题,它不仅仅是一个人的责任。每个卡夫卡的管理员、linux系统管理员、网络和存储管理员以及应用程序开发人员必须共同来构建一个可靠的系统。 Apache kafka的数据传输可靠性非常灵活。我们知道kafka有很多用例,从跟踪网站点击到信用卡支付。一些用例要求最高的可靠性,而另外一些用例优先考虑四度和简单性而不是可靠性。kafka被设计成足够可配置,它的客户端API足够灵活,允许各种可靠性的权衡。 由于它的灵活性,在使用kafka时也容易意外地出现错误。相信你的系统是可靠的,但是实际上它不可靠。在本章中,我们将讨论不同类型的可靠性以及它们在apache kafka上下文中的含义开始。然后我们将讨论kafka的复制机制,以及它如何有助于系统的可靠性。然后我们将讨论kafka的broker和topic,以及如何针对不同的用例配置它们。然后我们将讨论客户,生产者、消费者以及如何在不同的可靠性场景中使用它们。最后,我们将讨论验证系统可靠性的主体,因为仅仅相信一个系统的可靠是不够的,必须彻底的测试这个假设。

    Reliability Guarantees 可靠性保证

    当我们谈论可靠性时,我们通常谈论的是保证,即系统在不同情况下能够保证保持的行为。最著名的可靠性保证就是ACID,它是关系数据库中普遍支持的标准可靠性保证。ACID代表原子性,一致性、隔离性和持久性。当供应商解释它们的数据库是acid兼容的意思就是数据库保证某些行为的事务性。 这些担保是人吗信任关系数据库和它们最关键应用程序的原因。它们确切的知道系统承诺了什么以及它在不同条件下的行为方式。他们了解担保。并可以依靠这些担保编写安全的应用程序。 对于哪些寻求构建可靠应用程序的人来说,理解kafka提供的保证是至关重要的。这种理解使系统的开发人员能够了解在不同的故障条件下的系统行为方式。那么 apache kafka能保证什么呢 ?

    kafka为分区中的消息提供了顺序保证。如果消息B使在消息A之后编写的,在相同的分区中使用相同的生产者,那么kafka保证消息B的offset将高于消息A,并且消费者在消息A之后读取消息B。当生成的消息被写入到分区的所有同步副本,不一定要刷新到磁盘上,将被认为是提交成功的。生产者可以选择在消息完全提交发送给leader或者通过网络发送时接收已发送的消息确认。只要至少有一个副本保持活跃状态,提交的消息就不会丢失。消费者只能读已提交的消息。

    这些基本的保证可以在构建可靠系统时使用,但其本身并不能使系统完全可靠。构建可靠的系统需要权衡利弊,而构建kafka使为了让管理员和开发人员通过提供配置参数来控制这些权限,来决定他们需要多少可靠性。权衡通常涉及可靠性和一致性以及其他重要的因素。如可用性、高吞吐量、低延迟和硬件成本等重要性。我们接下来回顾kafka的复制机制,介绍术语,并讨论可靠性是如何构建到kafka的。在哪之后,我们回顾刚才提到的配置参数。

    Replication 复制

    kafka的复制机制,为每个分区都提供了多个副本,这是kafka所有可靠性保证的核心。在多个副本中写入消息是kafka在崩溃时提供消息可靠性保证的方法。我们在第五章中深入的解释了kafka的复制机制,我们在此回顾一下重点。 每个kafka的topic被分隔成多个分区,这些分区是基本的数据构建块,分区存储在单个磁盘上,kafka保证分区内的消息的顺序性。分区有在线和离线两种状态。 每个分区可以有多个副本,其中一个副本是leader。所有的消息都由leader副本生成并使用。其他副本只需与leader保持同步,并及时复制所有的最新消息。如果leader不可用,那么其中一个同步的副本将变成leader。 一个副本被认为是同步的,如果它是一个分区的leader,或者它是一个follower。

    与zookeeper有一个session,这个session的时间是6秒,在最后6秒发送一个心跳给zookeeper。这个时间可以配置。在10秒内从leader获取消息,这个时间也是可以配置的。获取leader在最近10秒的最新消息,也就是说,follower仍然从leader哪里得到的消息是不够的,它必须没有延迟。

    如果一个副本失去了与zookeeper的连接,停止获取新消息,或者在10秒内无法跟上,那么这个副本会被认为是不同步的。 当一个不同步的副本在此连接到zookeeper时,它就会恢复同步,并跟上给leader的最新消息,着通常在网络故障恢复之后,很快就会发生。如果存储副本的broker宕机时间比较长的话,这需要一段时间。

    Out-of-Sync Replicas不同步的副本

    看到一个或者多个副本在同步状态和非同步状态之间快速切换,那么集群肯定出现了问题。原因通常是broker上的java GC配置错误。错误的GC会导致broker停止几秒。在此期间,它将失去与zookeeper的连接。当broker失去与zookeeper的连接时,就会被认为与集群不同步,从而导致切换的行为。

    稍微落后一点的同步副本会降低生产者和消费者的速度,因为他们会等待所有副本在提交消息完成之后才能获得消息。一旦这个副本失去同步,我们不再等待它获得消息,它仍然落后,但是现在不会影响性能。但是问题是同步的副本越少的话,分区的有效复制因子就越低,如果出现停机等故障就会存在更高的数据丢失的风险。 在下一章,我们将详细讨论在实践中的影响。

    Broker Configuration broker配置

    broker中有三个配置参数能改变kafka关于可靠消息存储可靠性的行为。与许多broker的配置变量一样,这些变量可以应用于broker级别,控制系统中所有的topic配置,也可以应用到topic级别,控制特点topic的行为。 能够在topic级别控制可靠性意味着要给kakfa集群可以用来托管可靠和不可靠的topic。例如,在银行中,管理有可能希望为整个集群设置非常可靠的缺省值,但是对于存储的某些数据是可以接受丢失的如客户投诉的topic,则应该例外。 让我们逐个讨论这些参数,看看他们是如何影响kafka中消息存储的可靠性以及所涉及的权衡。

    Replication Factor 副本因子

    在topic级别的配置是replication.factor。在broker级别,你可以通过default.replication.factor控制自动创建topic的参数。 在此之前,在全书中,我们一直假设topic的副本因子为3,这意味着每个分区都要在三个不同的broker上复制三次,这是一个合理的假设。因为这事akfka的默认配置。但这也是用户可以修改的配置。即使topic存在之后,你也可以选择添加和删除副本,从而修改副本因子。 副本因子N,则允许你丢失N-1个broker任然可以可靠的对topic进行读写。因此,更高的副本因子会带来更高的可用性、更高的可靠性和更少的灾难。另外一方, 如果副本因子为N,则至少需要N个broker,并且需要存储N份数据的副本,这意味着需要N倍的磁盘空间。我们基本上是在通过空间来换得可靠性保证。 那么如何确定topic的正确副本数量呢,答案取决于一个topic有多重要,以及你愿意为更高的可用性支付多少。这点也取决于你有多偏执。 如果你完全能够容忍在单个broker重启时,(集群的正常操作)某个特定的topic不可用,那么副本因子设置为1就足够了。不要忘记确保你的管理人员和用户也同意这种权衡,这样可以节省磁盘或者服务器,但是你会失去高的可用性。 复制因子为2也意味着丢失一个broker仍然可以正常工作,这听起来似乎已经足够了,但是请记住,丢失一个broker会使集群处于不稳定状态。这可能是个不好的选择。 由于这些愿意,我们建议对于存在可用性的任何topic,复制因子都为3。在极少数情况下,这还被认为不够安全,因此我们曾见过,银行都是使用5个副本,以防万一。 副本放置也非常重要,在默认情况下,kafka将确保分区的每个副本都位于单独的broker上,然而,在某些情况下,这还是不够安全,如果一个分区的所有副本都放在同一机架的broker上,并且机架顶部交换机不正常,那么无论副本因子如何,分区都将失去可用性。 为了防止机架故障,我们建议将broker放置在多个机架上并使用broker.rack配置。broker.rack将配置每个broker的机架名称。kafka将确保分区的副本分布在多个机架上,以确保更高的可用性。在第五章中,我们详细的介绍了kafka如何在broker和机架上放置副本。如果你有兴趣的话可以了解更多。

    Unclean Leader Election 不干净的leader选举

    此配置在broker上实际使整个集群上可用。参数为unclean.leader.election.enable 启用,默认值为true。 如前文所述,当分区的leader不再可用时,将选择一个同步副本做为新的leader。这种leader选举是干净的,因为它保证了提交数据不会丢失。根据定义,提交的数据存在于所有同步副本上。 但是,我们除了当前的故障副本之外没有同步的副本可用怎么办? 这会导致如下两种情况:

    分区有三个副本,两个follower不可用,假定两个broker宕机。在这种情况下,生产者继续写消息给leader。所有的消息都被确认并提交。因为leader是唯一可用的副本。现在让我们假定leader变得不可用。在这个场景中,如果一个不同步的follower启动,我们将有一个不同步的副本做为该分区唯一可用的副本。该分区有三个副本,但是由于网络问题,两个follower落后了,因此即使他们正在允许并进行了复制,他们仍不同步。leader做为唯一的同步副本继续接收消息。现在,如果leader变得不可用,两个可用的副本将不再同步。

    在这两种情况下,我们需要做出一个艰难的选择:

    如果我们不允许不同步的副本成为新的leader的话,分区将保持脱机状态,直到旧的leader重新启动。在某些情况下,这可能需要数小时。如果我们允许不同步副本称为新的leader,我们将丢失在副本不同步的时候写给旧leader的所有消息。并且还会导致一些消费者的不一致。为什么?假设副本0和1不可用时,我们将offset为100-200的消息写入到副本2。现在副本3不可用,而副本0已经恢复,副本0只有0-100的消息,而不是100-200,如果午没允许副本0成为leader,那么它将允许生产者写入新消息,并且允许消费者消费它们。所以,现在新的leader有了新的100-200的消息。但是,我们需要注意的就是某些消费者可能已经消费了100-200的旧消息,有些消费者消费了100-200的新消息。有些则两部分都包含了。现在在查看下游的情况,这会导致非常糟糕的后果。此外,副本2将重新上线。成为新的leaderr的follower。在这个时候,它将删除他之前收到的任何领先于当前leader的消息,这些消息对任何消费者都将不可用。

    总之,如果我们允许不同步副本成为leader,我们将面临数据丢失和数据不一致的风险。如果我们不允许它们成为leader,我们将面临更低的可用性,因为在分区恢复联机之前,必须等待原始的leader变为可用。 设置unclean.leader.election.enable为true,意味着我们允许不同步的副本成为leader,被称为不洁选举。知道当这种情况发生的时候,我们将丢失信息。如果我们将其设置为false,我们选择等待最初的leader重新上线。导致可用性降低。在数据质量和一致性非常重要的系统中,我们通常会看到这个参数被禁用,配置为false。银行系统是一个很好的例子。大多数银行宁愿无法处理行信用卡支付几分钟甚至几个小时也不能容忍付款错误。在可用性更重要的系统中,比如实时的点击流分析,不洁选举参数通常是启用的。

    Minimum In-Sync Replicas 最低同步副本

    在topic和broker级别都有min.insync.replicas。 正如我们所看到,在某些情况下,即使我们将topic配置为三个副本,也可能只剩下一个同步的副本。如果这个副本变为不可用。我们可能不得不在可用性和一致性上做出选择。这从来都不是一个i额容易的选择。请注意,部分问题在于,根据kafka的可靠性保证,当数据写入到同步副本时,才会被认为提交成功。即使所有的副本都意味着只有一个副本,如果该副本不可用,数据旧可能丢失。 如果希望确保将提交数据写入多个副本,则需要将同步副本的最小数目设置为较高的值。如果一个topic有三个副本,你设置min.insync.replicas为2。然后只有在三个副本中至少有两个时同步的情况下,才能写入topic中的一个分区。 当所有三个副本都同步时,一切都正常进行。如果其中一个副本不可用,也会出现这种情况,但是如果三个副本中有两个不可用,broker将不再接受生成请求。相反,尝试发送数据的生产者将收到NotEnoughReplicasException异常。用户可以己洗读取现有数据,实际上,通过这种配置,当个同步副本变成只读,这可以防止数据生成和消费的不良情况,即当不干净选举发生时数据才会消失。为了从这种只读情况中恢复,我们必须使用两个不可用分区中的一个重新可用,可能需要重启broker,并等它追赶上并同步。

    Using Producers in a Reliable System 生产者可靠性

    即使我们以最可靠的配置来配置broker,如果我们不将生产者也做相应的配置,那么系统还是有可能会丢失数据。 不妨看看如下两个案例:

    我们用了三个副本配置了broker,并且禁止了不洁的leader选举,因此我们不应该丢失任何一条给kafka集群发送的消息。但是我们将生产者配置为使用acks=1发送消息。我们从生产者哪里发送了一条消息,它被写到了leader,但是还没同步到副本。leader在告诉生产者,消息已成功写入,之后在数据复制到其它副本之前立即宕机。其他的副本在ISR中,因为系统需要一段时间才能识别不同步的副本。这些不同步的副本之一将会变成leader。但是由于消息没有写入副本,因此它将丢失,但是生产者程序认为这个消息写入成功了。系统时一致的,因为没有消费者看到这条消息,因为副本从未复制到这条消息,但是从生产者的角度来看,这条消息旧永远丢失了。我们配置了三个副本,并且禁止了不洁选举,我们从错误中吸取教训,将acks配置为all。假设我们试图写入一条消息给kafka,但我们正在写的分区leader刚刚宕机,新的分区仍在选举中。kafka会响应:Leader not Available。此时,如果生产者没有正确处理错误,并且没有重试到写入成功,那么消息仍然可能丢失。同样,这不是broker的可靠性问题,因为broker从未得到消息,这也不是一致性问题,因为消费者从未得到消息。但是,如果生产者没有正确处理错误,它们可能会导致消息丢失。

    那么,我们将如何避免这些错误的发生呢?如示例所示,有两件重要的事情时kafka的应用程序的开发者需要注意的:

    使用正确的acks来匹配可靠性要求正确的处理配置和代码中的错误

    我们在第三章中讨论了生产者,在此我们再回顾这一点。

    Send Acknowledgments 发送ack

    生产者可以选择以下三种不同的ack处理模式:

    acks=0 意味着如果生产者没法通过网络发送消息,则认为该消息已成功写入kafka。如果发送的对象不能序列化或者网络失败,你仍然会得到错误,但是如果分区离线或者整个kafka集群决定长期离线,则不会得到任何错误。这意味着,即使在干净的leader选举的情况下,你的生产者也会丢消息。因为在选举新的leader的时候,它不会知道leader不可用,使用ack=0允许会非常快,这事为什么你会看到许多的使用者用这种配置进行基准测试。你可以获得惊人的吞吐量并利用大部分带宽,如果你选择这种配置,你肯定会丢失一些消息。acks=1 意味着leader在收到消息的那一刻会发送一个确认或者一个错误,并将其写入到分区数据的文件(不一定是磁盘)。这意味着leader选举的正常情况下,你的生产者会获得一个LeaderNotAvailableException异常,如果生产者正确处理这个错误,它将重发消息直到消息到达安全的leader。如果leader崩溃,并且在崩溃之前没有将一些已成功写入leader并确认的消息复制给follower,那么你可能会丢失数据。acks=all 这意味着生产者将等待直到所有的同步副本收到消息然后发送一个确认或者错误。在broker上配置min.insync.replica。这允许你配置在确认消息之前有多少副本获得消息。这是最安全的选择,在消息完全提交之前,生产者不会发送消息。这也是最慢的选择,生产者需要等待所有副本获得消息,然后才能将消息批处理标记为完成并继续执行。这种影响可以通过生产者使用异步模式和发送更大的批次来降低,但是这个选项通常会降低吞吐量。

    Configuring Producer Retries 配置生产者重试

    生产者的错误处理分为两部分,生产者为你自动处理以及你做为使用生产者的开发人员手动来处理。生产者可以为你处理broke返回的重试错误。当生产者向broker发送消息时,broker可以返回成功和错误代码。这主要有两类错误代码,可以通过重试解决的和无法解决的错误。例如,broker返回了LEADER_NOT_AVAILABLE,则生产者可以尝试在此发送,此时broker可能已经选择出了一个leader,则第二次尝试将成功。这意味着LEADER_NOT_AVAILABLE时一个可重试的错误。另外以一方面,如果broker返回NVALID_CONFIG,再次重试并不会改变配置,这是一个不可重试的错误。 通常,如果你的目标是消息永不丢失,那么最好的方法是配置生产者使其在遇到可以重试的错误后不断重发,为什么?因为像缺少leader黑哦在网络连接之类的问题通常需要几秒才能解决,如果让生产者自动重发,那么你不需要对此问题做任何处理。我经常被问到,我应该为生产者配置多少次重试?而答案取决于生产者重试出异常之后你打算处理多少次后放弃。如果你的回答是我将捕获异常并再次重试,那么你肯定需要设置更高的重试次数,让生产者继续重试。当你的回答是,我需要删除这个信息,继续重试没有任何意义,或者我将在其他的媳妇写入,后续再处理。请注意,kafka跨数据中心的复制工具MirrorMaker(第八章会介绍)默认的配置为不断重试,因为做为一种高度可靠的复制工具,它永远不应该丢消息。 需要注意的是重试将会导致一个风险,就是两个消息都写入到broker从而导致数据重复。例如,如果网络问题导致broker的回包到达生产者,但是成功的写入和复制了消息,则生产者会把缺少消息确认视为网络临时问题,并将重复发送,因为它不知道已经收到了消息。再这种情况下,broker最终将拥有相同的消息两次,重试和小心的错误处理可以保证每个消息至少存储一些,但是再apache kafka 0.10.0之前,我们不能保证消息只存储一次。许多应用程序为每个消息添加唯一的标识符,以便在消费消失时决策和清理重复消息。其他的应用程序使消息具有idempotent幂等性,即意味着相同的消息重复发送两次也不会对正确性产生影响。例如,消息账户值110 使 幂 等 的 , 因 为 发 送 几 次 都 不 会 改 变 结 果 , 向 账 户 添 加 10 使幂等的,因为发送几次都不会改变结果,向账户添加10 使10则是不幂等的,因为每次发送都会改变结果。

    Additional Error Handling 添加错误处理

    使用内置的生产者重试机制使正确的处理大量错误而不丢失消息的最简单的办法,但是做为开发人员,你仍然需要能够处理其他类型错误的方法。包括:

    不可重试的broker错误,如消息大小错误,授权错误等。在消息发布给broker之前,发生的错误,例如,序列化错误。当生产者程序耗尽所有的重试次数,或者由于在重试时使用所有的内存存储消息,生产者程序所使用的可用内存以达到阈值的错误。

    在第三章中,我们讨论了如何为同步和异步消息发送方法编写错误处理的程序。这些错误处理程序的内容是特定于应用程序及其目标的,要扔掉坏消息吗?登陆错误吗?将这些消息存储在本地磁盘的目录中?触发另外一个应用程序的回调。这些决策将是特定于你的体系结构,请注意,如果你的错误处理程序所做的一切是重试,那么你最好依赖于生产者内部的默认重试功能。

    Using Consumers in a Reliable System 消费者的可靠性

    现在我们已经对kafka内部的可靠性保证和生产者的可靠性,现在是时候看看消费者如何保证了。 正如我们在本章第一部分看到了,数据只有在提交给kafka之后才对消费者可用,这意味着数据被写入到所有副本中,这对消费者得到的数据保证是一致的。消费者要做的唯一一件事情就是确保它们知道哪些消息是被消费过的,哪些消息没有被消费,这事使用消息时不丢失消息的关键。 当从一个分区开始读取数据的时候,消费者正在获取一批数据,检查批中最后的offset。然后从收到的ui后一个offset开始请求另外一个批次。这保证kafka消费者将总是正确的顺序获得新数据,而不会遗漏任何消息。 当一个消费者停止工作的时候,另外一个消费者知道要从哪开始工作,前一个消费者的停止之前处理的最后一个offset是什么?另外一个消费者甚至可以是重启后的消费者。这实际上并不重要。一些消费者将从该分区开始消费,它需要知道是从哪个offset开始。 这就是为什么消费者需要commit它们的offset。对于正在使用的每个分区,消费者存储的是其当前位置,因此它们或者其他的消费者知道在重启后如何继续。消费者丢失消息的主要方式是已读单尚未完全处理的消息的提交的offset。当另外一个消费者开始工作时,它将跳过这些消息,它们永远不会被处理。这就是为什么要非常注意何时以及如何commit是至关重要的。

    Committed Messages Versus Commited Offsets 已提交的消息和offset

    这与提交消息不同的是,提交消息是写入到所有同步副本并可以供其他消费者消费的消息。已提交的offset是消费者发送给kafka的offset,用于确认它们已接收并处理了分区中达到此特定offset的所有消息。 在第四章中我们详细讨论了消费者API,并介绍了许多提交offset的方法,在这里我们将介绍一些重要的注意点和选择。请参阅第四章以连接有关api的详细信息。

    Important Consumer Configuration Properties for Reliable Processing 用于可靠性的重要配置参数

    要为用户配置所需要的可靠性行为保证,需要了解四个非常重要的配置属性: 第一个参数是group.id,正如在第四章中详细解释的那样,基本的思路是,两个消费者如果有相同的group id 和订阅一个相同的topic,每个消费者将非配topic的一个子集。因此只有单独的消费者才会完整的处理一个topic的各个分区。如果你需要消费者子集查看和订阅其主题的每一条消息,那么它将需要一个唯一的group.id 。 第二个相关的配置参数是auto.offset.reset,这个参数控制消费者在没有提交offset或者当消费者请求broker中不存在的iffset时所做的操作。在第4章解释了这是如何发生的。治理只有两个选择,如果选择earliest,消费者将在分区没有offset时从分区的最开始位置开始消费。这可能回导致消费者两次处理相同的消息,但是这可以保证数据丢失的可能性最小。如果你选择latest,消费者将从分区的末尾开始,这将尽量减少消费者重复处理消息,但是几乎肯定的导致消费者错过很多消息。 第三个相关的参数是enable.auto.commit。这是一个重大的决定,是让消费者根据计划为你提交commit,还是计划在代码中手动提交commit?自动offset提交的主要好处是让消费者尽量简单,不需要关心offset。如果你的消费者在轮询循环中对已使用的消息进行处理,那么自动的offset将保证你将永远不会提交未处理的offset。自动的offset提交的主要缺点是你无法控制可能需要重复处理的重复消息的数量。因为你的客户端在处理了一些记录之后在自动提交开始之前就停止了。如果你做了一些有趣的事情,比如将记录传递给另外一个在后台处理的线程,那么自动提交可能回提交消费者已读但是尚未处理的消息的offset。 第四个相关的参数是auto.commit.interval.ms。如果选择自动提交offset,则此配置允许你提交offset的最低频率。默认是5秒一次。通常,更频繁的提交回增加一些额外的开销,但是回减少用户停止时可能出现的重复次数。

    Explicitly Committing Offsets in Consumers 精确的commit 消费者的offset

    如果使用自动commit offset。就不需要担心显示提交offset。但是如果你决定需要更多的控制offset提交的事件,或者是为了最小化的重复。或者是因为你在主消费者轮询循环之外进行了消息处理,那么你确实需要考虑如何提交offset。我们不会在这里讨论提交offset所涉及的机制和api,因为在第四章中已经有深入的介绍。相反,我们将回顾在并发可靠地处理数据的消费者时的一些重要的事项。我们将从简单可能明显的要点考试,然后转向复杂的模式。

    Always commit offsets after events were processed 始终在处理消息后提交offset

    如果在轮询循环中进行所有的处理,并且不维护轮询循环之间的状态,比如聚合,那么这就非常容易,你只需要自动提交配置,在轮询循环结束时提交commit。

    Commit frequency is a trade-off between performance and number of duplicates in the event

    of a crash 提交频率是在发生崩溃时性能和重复消息数量之间的权衡 即使在最简单的情况下,也可以选择一个选中提交多次或者选择每几个循环提交一次。提交有一些性能开销,类似于使用acks=all,因此它完全取决于你的平衡。

    Make sure you know exactly what offsets you are committing 确保你清楚精确的commit offset是什么

    在轮询循环中提交offset时,一个常见的陷阱是意外提交了轮询时读取的最后一个offset,而不是已处理的最后一个offset。请记住,在消息呗处理后始终提交offset是至关重要的。对于已读但是未呗处理的消息提交offset可能会导致消费者丢失消息。第四章中说明了如何做到这一点。

    Rebalances 重平衡

    在设计你的消费者程序的时候,请记住将会发生消费者的reblance,你需要正确处理它们,在第四章中包含了一些示例。更重要的是,这通常涉及在分区被撤销之前的offset。以及在分配新分区时清理维护的任何状态。

    Consumers may need to retry 消费者可能也需要重试

    在某些情况下,在调用轮询并处理记录之后,有些记录没有被完全处理,需要稍后处理。例如,你可能试图从kafka写入记录到数据库,但是发现数据库此时不可用,你可能希望稍后重试。注意,与创痛发布订阅系统不同,你提交的offset,而不是单个的ack。这意味着,如果你未能处理30 而成功处理31 ,则不能提交31,这会导致提交到31的结果会报告30,通常这不是你想要的。相反,你可以尝试来一种模式。 当遇到可重试的错误时,一个选项时提交成功处理最后的一条记录,然后仍然需要处理的记录存储在缓冲区中,并继续尝试处理这些记录。在尝试处理所有记录时,你可能需要保持轮询。你可以使用消费者的pause方法来确保额外的轮询不会返回额外的数据,从而使重试更加容易。 第二种选择是,当遇到可重复的错误时,将其写入到单独的topic并继续。可以使用单独的消费者或者消费者组去重新处理重试topic种的消息。或者一个消费者可以同时订阅包括重复主题在内的多个主题,但在重试之间暂停重试topic。此模式类似于许多消息传递系统种的死信队列系统。

    Consumers may need to maintain state 消费者需要维护状态

    在某些应用程序种,需要跨多个轮询调用维护状态。例如,如果你想计算移动平均的消费数量。你将希望每次在选举kafka获取信的消息后更新平均值。一种方法时在提交offset的同时将最新的累计值写入result的topic。这意味着,当一个线程启动时,它可以在启动时获取最新的累计值,并从它停止的地方获取。然而,这并不能完全解决问题,因为kafka还没提供事务。你可能会编写最小的结果之后和提交offset在崩溃之前,反之亦然。一般来说,这事一个相当复杂的问题。我们不建议你自己解决它。而是建议你查看像kafka Stream这些库,它未聚合,连接,窗口及其他复杂分析提供了类似DSI的高级API。

    Handling long processing times 处理较长的处理时间

    有时处理记录需要很长的时间,丽日,你可能正在于可能阻塞的服务交互,或者正在进行非常复杂的计算。在kafka消费者的某些版本种,轮询停止的时间不能超过几秒。即使你不想处理其他的记录,也必须继续轮询,以便消费者能够将心跳发送到broker。在这种情况下,一种常见的模式是在可能的情况下将数据传输给多线程的线程池。通过冰箱处理来提高速度。在记录传递给工作线程之后,你可以暂停消费者并继续轮询。直到工作线程完成为止。实际上不需要额外的数据。一旦他们完成,你就可以恢复消费者。因为消费者从不停止轮询。所以心跳将按计划发送,reblance不会被触发。

    Exactly-once delivery 精确一次传递

    一些应用不仅需要最少一次的语义,意味着没有数据丢失。而且也需要精确一次的语义。而kafka不提供完整的exactly-once支持。消l费者可以使用一些技巧。以保证每条消息在kafka将写入外部系统时只发生一次。注意,这并不能处理kafka在生产者写入时导致的数据重复。 最简单最常见的办法就是将数据结果写入职位唯一key的系统。这报告所有的key-value存储,关系数据库,以及Elasticsearch。可能还有跟多的数据存储。当结果写入关系数据库或者Elasticsearch时,哎哟吗记录本身包含一个唯一的key,要么可以使用topic,分区和offset创建一个唯一的key。它唯一地表示kafka的记录。如果你写记录写入为具有唯一key的值,并且稍后又意外地使用了相同的记录,那么你将写入完全相同key和value。数据存储将覆盖现有的存储,你将得到相同的结果,而没有意外的重复,这种模式称为幂等写,非常常见和有用。 另外要给选项时在向具有事务的系统写入时使用,关系数据库时最简单的例子,但是HDFS又原子的重命名,通常用于相同的目的。其思想时同一个事务种写入记录及其offset,这样他们就会同步。在启动时,检索写入外部存储系统的最小记录的offset,然后使用consumer.seek再次重这些offset开始消费。第四章种包含了一个这样的例子。

    Validating System Reliability 系统可靠性验证

    一旦你完成了确定你需要的可靠性需求和配置、broker,客户端以及最适合你的用例的方式使用api,你就可以在生产环境种运行一切,并确信不会漏掉任何消息,对吗? 你可以这样做,但是我们建议先进行一些验证,我们建议三层验证:配置验证,验证应用程序和监视生产中的应用程序,让我们看看每个步骤,看看需要验证什么以及如何验证。

    Validating Configuration 配置验证

    在隔离于应用程序逻辑的情况下,很容易测试broker和客户端配置,建议这样做又如下两个原因:

    它有助于测试你所选的配置是否满足你的需求通过系统的预期行为推理是很好的练习。这一章理论性比较强,所以检查你对理论如何应用于实践的理解是很重要的。

    kafka通过两个重要的工具来帮助验证。org.apache.kafka.tools包含VerifiableProducer 和VerifiableConsumer,他们可以做为命令行工具运行,或者嵌入到自动会测试框架中。 其思想是可验证的生产者生成一系列消息,其中包含从1到你选择的数字,你可以像配置自己的生产者一样配置它,设置正确的ack数量,重试和生成消息的速率。当运行它的时候,它将根据接收的ack打印发送到broker的每个消息的成功或者错误。可验证的消费者执行补充检查。它使用实践,通常式由可验证的生产者生成的事件并按顺序打印所使用的事件。它还打印关于提交和reblance事件。 你还应该考虑要运行哪些测试。如:

    leader选举,如果我们kill了leader会怎么样?生产者和消费者多长时间才能恢复正常工作?控制器选择,重启控制器后,系统需要多少时间才能恢复?滚动重启,我们可以之歌重启broker而不丢失任何消息吗?不干净的leader选举测试,当我们逐个kill一个分区的所有副本,以确保每个副本不同步,然后启动一个不同步的broker,会发生什么?为了恢复澳洲,需要怎么做?这是可以接收的吗?

    然后选择一个场景,启动可验证的生产者、可验证的消费者,并允许整个场景,如:kill掉分区的leader之后仍然写入消息。如果你希望短暂的暂停,然后一切恢复正常,没有消息丢失,请确保生产者生成的消息数量和消费者消耗的消息数量的匹配。 Apache的源代码包中包括一个扩展的测试套件,套件中的血多测试都是基于同样的原则。例如,使用可验证的生产者和消费者来确保滚动的升级工作。

    Validating Applications 验证应用程序

    一旦确定broker和客户端配置满足要求,就该测试该应用程序是否提供了足够的保证。这将检查定制的错误处理代码,offset提交,reblance监听器以及应用程序逻辑与kafka客户端交互的类似位置。 当然,因为它是你的应用程序,所以对于如何测试它,我们只能提供这么多指导,希望你已经将应用程序的集成测试做为开发过程的一部分。无论你如何验证你的程序,我们建议在各种失败条件下运行测试:

    客户端失去对服务端的连接(模拟网络故障)leader选举滚动重启broker滚动重启消费者滚动重启生产者

    对于每个测试场景,你都将看到预期的行为,这事你在开发应用程序时计划看到的行为。然后你可以运行测试来查看实际发生了什么。例如,你计算在用户滚动启动时,你可以计划在用户重新平衡时短暂暂停,然后继续使用不超过1000个重复值的消费。你的测试将显示应用城西提交的offset和处理reblance的方式是否确实以此种方式工作。

    Monitoring Reliability in Production生产环境可靠性监控

    测试应用程序很重要,但是它不能取代持续的监视生产系统以确保数据的预期流动重要。在第9章中将介绍如何监视kafka集群,但是出了监控集群的运行状况之外,监控客户端和系统中的数据流也很重要。 首先kakfa的java客户端包括允许监控客户端状态的事件和jmx度量。对于生产者来说,可靠性最重要的两个指标是每条记录的错误率和重试率。请密切关注这些情况。因为错误率和重试率上升可能表明系统存在问题,还要监视生产者的日志,确认发送消息日志的级别,在warn级别,如果出现“Got error produce response with correlation id 5689 on topic-partition [topic-1,3], retrying (two attempts left). Error: …” 如果你看到剩下0次重试,则生产者的重试已用完。根据之前章节的内容,你可能需要增加重试的数量。或者解决导致错误出现的问题。 在消费者方面,最重要的衡量指标是消费者的滞后,此指标提示消费者据力提交到broker上分区的最新消息有多远。理想情况下,延迟总是为0,用户总是读取最新消息。在实际上,由于调用poll返回多个消息,然后消费者获取更多的消息之前会花时间处理他们,因此延迟总是会有一点波动。重要的是确保消费者最终追上,而不是最终越来越落后。因为预期的消费者延迟波动,在指标设置为传统的警报可能具有挑战性。Burrow是Linkedin的一个消费者检查器。它可以使这一点很容易的完成。 监视数据流还意味着确保以及时的方式使用所有生产者的数据,为了确保数据的及时性,你需要直到数据是如何生成的。kafka在这方面提供了帮助。从0.10.0开始。所有消息都包含一个时间戳,表面消息的产生时间。如果你允许的是比较早的客户端,我们建议为每个消息记录时间戳,生成消息的应用程序名称和创建消息的主机名。这将有助于跟踪问题的来源。 为了确保在合理的时间内消耗所生成的消息,你将要生成代码的应用程序记录生成的消息数量,通常称为每秒事件。消费者需要使用消息事件戳激励所消耗的消息数量。还需要记录从生产者到消费者消费的事件间隔。然后,你需要一个系统来协调来自生产者和消费者的每秒事件数,以确保在传输过程中没有丢失。并确保时间事件之间的间隔在合理的时间内生成。为了更好的监视,你可以在关键的topic上添加一个监视的消费者,该消费者将对事件消息进行计数并将其与生成的事件进行比较,这样即使在给定的时间点上没有人消费消息,你也可以获得对生产者的准确监控。这些类型的端到端的监控系统可能具有挑战性和费时性。如我们所知,这种类型的监控系统目前还没有开源的实现。但是confluent提供了一个商业实现做为confluent中的一部分。

    Summary 总结

    正如我们在文章开篇所描述的那样,可靠性不仅仅是一个具体的kafka功能问题。你需要构一个完整的可靠性系统,包括应用程序的体系结构,应用程序使用生产者和消费者api的方式,生产者和消费者的配置,topic和broker的配置灯灯。细羽系统更加可靠目必须在应用程序的复杂性,性能、可用性和磁盘空间使用方面进行权衡。通过理解所有的选项和公共的模式,并理解用例的需求,你可以根据应用程序和kafka部署的可靠性以及哪些权衡是有意义的做出明智的决定。

    Processed: 0.011, SQL: 9