本文参考了网上部分文章,并进行了说明和归纳,详情请查看参考。
官方文档参考:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools
kafka环境中机器磁盘告警很容易出现,原因可能是某一个topic的partition为1(或者partition不足导致的),只往一台机器上写数据,造成kafka集群空间使用不均。
下面主要使用kafka-topics.sh和kafka-reassign-partitions.sh来解决问题。
推荐使用kafka manager来管理kafka集群。
而kafka的partition离不开Controller 的监听,先介绍一下Controlle。
Controller 在初始化时,会利用 ZK 的 watch 机制注册很多不同类型的监听器,当监听的事件被触发时,Controller 就会触发相应的操作。
Controller 在初始化时,会注册多种类型的监听器,主要有以下6种:
监听 /admin/reassign_partitions 节点,用于分区副本迁移的监听;监听 /isr_change_notification 节点,用于 Partition Isr 变动的监听,;监听 /admin/preferred_replica_election 节点,用于需要进行 Partition 最优 leader 选举的监听;监听 /brokers/topics 节点,用于 Topic 新建的监听;监听 /brokers/topics/TOPIC_NAME 节点,用于 Topic Partition 扩容的监听;监听 /admin/delete_topics 节点,用于 Topic 删除的监听;监听 /brokers/ids 节点,用于 Broker 上下线的监听。本文主要讲解第一部分,也就是 Controller 对 Partition 副本迁移的处理。
./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --alter --topic test --partitions 6
此命令执行完之后即可再kafka集群其他机器中找到此topic的目录
只要配置zookeeper.connect为要加入的集群,再启动Kafka进程,就可以让新的机器加入到Kafka集群。但是新的机器只针对新的Topic才会起作用,在之前就已经存在的Topic的分区,不会自动的分配到新增加的物理机中。
为了使新增加的机器可以分担系统压力,必须进行消息数据迁移。Kafka提供了kafka-reassign-partitions.sh进行数据迁移。
这个脚本提供3个命令:
--generate: 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移,而是将消息迁移计划计算出来,供execute命令使用。--execute: 根据给予的消息迁移计划进行迁移。--verify: 检查消息是否已经迁移完成。Partition 的副本迁移实际上就是将分区的副本重新分配到不同的代理节点上,如果 zk 中新副本的集合与 Partition 原来的副本集合相同,那么这个副本就不需要重新分配了。
Partition 的副本迁移是通过监听 zk 的 /admin/reassign_partitions 节点触发的,Kafka 也向用户提供相应的脚本工具进行副本迁移,副本迁移的脚本使用方法如下所示:
./bin/kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute
其中 XXX.json 为要进行 Partition 副本迁移的 json 文件,json 文件的格式如下所示:
{ "version":1, "partitions":[ { "topic":"__consumer_offsets", "partition":19, "replicas":[ 3, 9, 2 ] }, { "topic":"__consumer_offsets", "partition":26, "replicas":[ 2, 6, 4 ] }, { "topic":"__consumer_offsets", "partition":27, "replicas":[ 5, 3, 8 ] } ] }这个 json 文件的意思是将 Topic __consumer_offsets Partition 19 的副本迁移到 {3, 2, 9} 上,Partition 26 的副本迁移到 {6, 2, 4} 上,Partition 27 的副本迁移到 {5, 3, 8} 上。
包括配置broker id等
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server.properties
停止命令:~/kafka/bin/kafka-server-stop.sh
topic为test目前在broker id为1,2,3的机器上,现又添加了两台机器,broker id为4,5,现在想要将压力平均分散到这5台机器上。
手动生成一个json文件topic.json
{ "topics": [ {"topic": "test"} ], "version": 1 },将test扩充到所有机器上
./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --topics-to-move-json-file topic.json --broker-list "1,2,3,4,5" --generate
生成类似于下方的结果
Current partition replica assignment {"version":1, "partitions":[....] } Proposed partition reassignment configuration {"version":1, "partitions":[.....] }Current partition replica assignment表示当前的消息存储状况。
Proposed partition reassignment configuration表示迁移后的消息存储状况。
那么我们就用下部分的json进行迁移。另外,也可以自己构造个类似的json文件,同样可以进行迁移。
将迁移后的json存入一个文件reassignment.json,供--execute命令使用。
./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --reassignment-json-file reassignment.json --execute
Current partition replica assignment ... Save this to use as the --reassignment-json-file option during rollback ..../bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --reassignment-json-file reassignment.json --verify
分区由原来一个变成了三个,并且集群里面的所有kafka,data目录都会产生相应的文件夹
注意:0,1,2是kafka配置文件中的broker.id,在这里只是生成配置文件,并没有真正均衡
将2中,生成的json,copy到一个json文件中,执行以下命令
# bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --reassignment-json-file exec.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"track_pc","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"track_pc","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"track_pc","partition":1,"replicas":[2],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. [root@bigserver2 kafka]# bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --reassignment-json-file exec.json --verify Status of partition reassignment: Reassignment of partition track_pc-0 is still in progress Reassignment of partition track_pc-2 is still in progress Reassignment of partition track_pc-1 is still in progress [root@bigserver2 kafka]# bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --reassignment-json-file exec.json --verify Status of partition reassignment: Reassignment of partition track_pc-0 completed successfully Reassignment of partition track_pc-2 completed successfully Reassignment of partition track_pc-1 completed successfully注意:到这里均衡已结束,但是对已有的数据,并不能起到均衡作用。
broker会watch "/admin/reassign_partitions"节点。当发现有迁移任务的时候,会将partiton的AR进行扩展,例如原先partiton的AR是[1, 2], 现在要迁移到[2, 3],那么partiton会先将AR扩展到[1, 2, 3],并监控ISR的变化。
当replica-2和replica-3都跟上后,即在ISR中的时候,表明新的repica-3已经和leader数据同步了。这个时候就可以将replica-1剔除了,最后得到迁移结果是[2, 3]。即迁移是一个先增加再减少的过程。
在2.1.2中已经说明了,该错误是由于"/admin/reassign_partitions"节点已经被删除了,但是AR和目标迁移列表不相同报的错,一般需要看下controller的日志,看下controller在迁移过程中是不是抛出了异常。
迁移需要等目标迁移列表中的replic都跟上了leader才能完成,目前迁移列表一直跟不上,那么就不会完成。可以看下zk中“/brokers/topics/{topic}/partitions/{partiton}/state”,注意下目标迁移列表是不是在isr中,如果不在说明要迁移的replic还没有完成从leader拉取数据。具体为甚么没有拉取成功,可能是数据量比较大,拉取需要一定的时间;也可能是其他原因比如集群宕机了等,需要具体分析下
Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。
有2个接口可以实现限制。最简单和最安全的是调用kafka-reassign-partitions.sh时加限制。另外kafka-configs.sh也可以直接查看和修改限制值。
例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过50MB/s(--throttle 50000000)。
$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json --throttle 50000000当你运行这个脚本,你会看到这个限制:
The throttle limit was set to 50000000 B/s Successfully started reassignment of partitions.如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 There is an existing assignment running. The throttle limit was set to 700000000 B/s一旦重新平衡完成,可以使用--verify操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。
当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json Status of partition reassignment: Reassignment of partition [my-topic,1] completed successfully Reassignment of partition [mytopic,0] completed successfully Throttle was removed.管理员还可以使用kafka-configs.sh验证已分配的配置。有2对限制配置用于管理限流。而限制值本身,是个broker级别的配置,用于动态属性配置:
leader.replication.throttled.rate follower.replication.throttled.rate此外,还有枚举集合的限流副本:
leader.replication.throttled.replicas follower.replication.throttled.replicas其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。
查看限流配置:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000 Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。
要查看限流副本的列表:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101, follower.replication.throttled.replicas=1:101,0:102这里我们看到leader限制被应用到broker 102上的分区1和broker 101的分区0.同样,follower限制应用到broker 101的分区1和broker 102的分区0.
默认情况下,kafka-reassign-partitions.sh会将leader限制应用于重新平衡前存在的所有副本,任何一个副本都可能是leader。它将应用follower限制到所有移动目的地。因此,如果broker 101,102上有一个副本分区,被分配给102,103,则该分区的leader限制,将被应用到101,102,并且follower限制将仅被应用于103。
如果需要,你还可以使用kafka-configs.sh的--alter开关手动地更改限制配置。
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools
https://wzktravel.github.io/2015/12/31/kafka-reassign/
http://blog.51yip.com/hadoop/2131.html
https://www.cnblogs.com/set-cookie/p/9614241.html
https://matt33.com/2018/06/16/partition-reassignment/
https://www.orchome.com/510