Flink流连接器之Kafka【二】【Kafka Offset设置、容错、主题和分区动态发现】

    技术2023-12-02  239

    一.Kafka offset 配置

    Flink Kafka Consumer可以配置Kafka分区的起始位置。 代码:

    //获取数据源 kafka val consumer : FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String]( "spark", new SimpleStringSchema(), properties ) //val hashMap : util.Map[KafkaTopicPartition, java.lang.Long] = new util.HashMap[KafkaTopicPartition, java.lang.Long]() consumer.setStartFromEarliest() // 尽可能从最早的记录开始消费 consumer.setStartFromLatest() // 从最新的记录开始消费 consumer.setStartFromGroupOffsets() // 根据组的offset位置开始消费 //consumer.setStartFromTimestamp(milliseconds) // 在kafka0.9中去掉 consumer.setStartFromSpecificOffsets(new util.HashMap[KafkaTopicPartition, java.lang.Long]()) // kafka0.9新增,从特定offset开始消费 val kafkaDataStream : DataStream[String] = env.addSource(consumer)

    详解:

    setStartFromGroupOffsets(默认行为):开始从消费者组(group.id在消费者属性中的设置)中在Kafka代理(或Kafka 0.9的Zookeeper)中提交的偏移中读取分区。如果找不到分区的偏移量,auto.offset.reset则将使用属性中的设置。setStartFromEarliest()/ setStartFromLatest():从最早/最新记录开始。在这些模式下,Kafka中已提交的偏移量将被忽略,并且不会用作起始位置。setStartFromTimestamp(long)【失效,0.8版本可用】:从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将被消费。如果分区的最新记录早于时间戳,则将仅从最新记录之后开始消费。在这种模式下,Kafka中已提交的偏移量将被忽略,并且不会用作起始位置。setStartFromSpecificOffsets(new util.HashMap[KafkaTopicPartition, java.lang.Long]()):为每个分区指定使用者应从其开始的确切偏移量。 详细配置如下: val hashMap : util.Map[KafkaTopicPartition, java.lang.Long] = new util.HashMap[KafkaTopicPartition, java.lang.Long]() hashMap.put(new KafkaTopicPartition("spark", 0), 23L) hashMap.put(new KafkaTopicPartition("spark", 1), 31L) hashMap.put(new KafkaTopicPartition("spark", 2), 43L) consumer.setStartFromSpecificOffsets(hashMap)

    上面的示例将消费者配置为从topic为spark的分区0、1和2的指定偏移量处开始消费数据。偏移量应该是消费者消费每个分区读取的下一条记录。请注意,如果消费者读取提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()从该特定分区的默认组偏移量处开始消费。

    请注意,当作业从故障中自动还原或使用保存点手动还原时,这些起始位置配置方法不会影响起始位置。还原时,每个Kafka分区的开始位置由保存点或检查点中存储的偏移量确定。

    二.Kafka消费者与容错

    启用Flink的检查点后,Flink Kafka消费者将使用主题中的记录,并以一致的方式定期检查检查点其所有Kafka偏移量以及其他操作的状态。万一作业失败,Flink将把流式程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新使用Kafka的记录。

    因此,检查点的间隔定义了在出现故障的情况下最多可以返回多少程序。

    要使用容错的Kafka消费者,需要在执行环境中启用拓扑检查点:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // checkpoint every 5s

    还要注意,只有在有足够的处理插槽可用于重新启动拓扑检查点时,Flink才能重新启动拓扑。因此,如果拓扑由于丢失TaskManager而失败,则此后仍必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。

    如果未启用检查点,Kafka消费者定期将偏移量提交给Zookeeper。

    三.Kafka消费者主题和分区动态发现

    分区发现 Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次精确的保证来消费它们。最初检索分区元数据后(即,作业开始运行时)发现的所有分区将从最早的偏移量开始消耗。

    默认情况下,分区发现是禁用的。要启用它,请flink.partition-discovery.interval-millis在提供的属性配置中设置一个非负值,表示发现间隔(以毫秒为单位)。

    限制从使用Flink 1.3.x之前的Flink版本的保存点还原消费者时,无法在还原运行中启用分区发现。如果启用,还原将失败,并出现异常。在这种情况下,为了使用分区发现,请首先在Flink 1.3.x中获取一个保存点,然后从中再次进行恢复。

    主题发现 在较高级别,Flink Kafka Consumer还能够使用正则表达式基于主题名称的模式匹配来发现主题。请参阅以下示例:

    val env = StreamExecutionEnvironment.getExecutionEnvironment() val properties = new Properties() properties.setProperty("bootstrap.servers" , "master:9092,slave01:9092,slave02:9092") properties.setProperty("zookeeper.connect" , "master:2181,slave01:2181,slave02:2181") properties.setProperty("group.id" , "spark") val consumer : FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String]( java.util.regex.Pattern.compile("spark[0-9]"), new SimpleStringSchema, properties) val stream = env.addSource(consumer)

    在上面的示例中,当作业开始运行时,使用者将订阅名称与指定的正则表达式匹配的所有主题。

    要允许消费者在作业开始运行后发现动态创建的主题,请为设置非负值flink.partition-discovery.interval-millis。这使使用者可以发现名称也与指定模式匹配的新主题的分区。

    四.Kafka消费者抵消承诺行为配置

    Flink Kafka Consumer可以配置如何将偏移量提交回Kafka brokers(或0.8中的Zookeeper)的行为。请注意,Flink Kafka Consumer不依靠承诺的偏移量来提供容错保证。承诺的偏移量仅是出于监视目的公开用户进度的一种方式。

    配置偏移提交行为的方式有所不同,具体取决于是否为作业启用了检查点。

    禁用检查点:如果禁用检查点,则Flink Kafka消费者将依赖内部使用的Kafka客户端的自动定期偏移量提交功能。因此,要禁用或启用偏移提交,只需在提供的配置中将enable.auto.commit(或auto.commit.enable 对于Kafka 0.8)/ auto.commit.interval.ms键设置为适当的属性值。启用检查点:如果启用检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。这样可以确保Kafka代理中的已提交偏移量与检查点状态中的偏移量一致。用户可以通过setCommitOffsetsOnCheckpoints(boolean)在消费者上调用方法来选择禁用或启用偏移提交 (默认情况下,行为是true)。请注意,在这种情况下,将完全忽略自动定期偏移提交设置。
    Processed: 0.018, SQL: 9