kafka源码---副本机制

    技术2022-07-10  107

    副本(Replica),一个分区对应多个副本,那么一个topic有不同的分区,不同的多个副本。假如我们有3个副本,这就有1个leader,2个follower。leader负责写入,follower负责读取,同时它从leader处拉取消息保存到自己的log中。     一般情况下,我们认为一个分区的多个副本分布在不同的broker上,当leader所在的broker出现故障,那么就会从剩下的follower中重新选取一个作为Leader.     如果多个副本出现在了同一个Broker上,不仅不能容错,还会出现负载不均衡。  

    1.副本

      使用Replica表示副本,在副本中,存在三个重要的量,ISR集合,HW值,LEO值,     ISR集合:表示的是同一个分区副本中的子集,存在的是“可用”且消息量与leader相差不多的副本集合,也就是副本最后一条消息的offset和leader的lastOffset相差小于一定的阈值,超出了就会被剔除ISR集合。     HW值:标记了一个特殊的offset,他和ISR一样,都是交给leader管理,当ISR集合中的replica都追加了该offset,则HW会递增,表示的意思为HW之前的record已经“commit",也就是ISR集合中的replica都保存了该record,就算leader宕机了,也不会丢失,而对于consumer来说,他们也只能看见HW值之前的record,看不到HW之后的record。     LEO值:表示最后追加到log的offset。replica自己管理自己的LEO, 重要字段: //记录 HW的值(highWatermark),由leader副本更新维护, @volatile private[this] var highWatermarkMetadata = new LogOffsetMetadata(initialHighWatermarkValue) //LEO 对于本地副本,记录的是追加到Log中的最新消息的offset,对于远程副本,意义不同,他需要同情请求进行更新 @volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata //记录follower副本追后一次追赶上leader的时间戳 @volatile private[this] var lastFetchLeaderLogEndOffset = 0L

    两个更新HW,LEO的方法

    def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { //只有本地副本才能更新 HW highWatermarkMetadata = newHighWatermark } else {.... } } private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { //对于本地副本,不能直接更新LEO,LEO由log.LEO字段决定... } else { //远程副本,直接更新 logEndOffsetMetadata = newLogEndOffset } }

    2.分区

        kafka在server使用replica表示副本,partition表示分区,partition管理自己的replica,包括create,分配leader,ISR集合管理,追加消息 private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1 //brokerId private val logManager = if (!isOffline) replicaManager.logManager else null //当前broker上的logManager对象,一个broker只存在一个 private val zkUtils = if (!isOffline) replicaManager.zkUtils else null //操作zk的工具类 private val assignedReplicaMap = new Pool[Int, Replica] //该分区的all replica 简称AR集合 // The read lock is only required when multiple reads are executed and needs to be in a consistent manner private val leaderIsrUpdateLock = new ReentrantReadWriteLock //锁 private var zkVersion: Int = LeaderAndIsr.initialZKVersion @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 //leader年代信息 @volatile var leaderReplicaIdOpt: Option[Int] = None //partition上replica中的Leader id @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica] //ISR集合

    2.1 创建副本

    //get / create replica 这里似乎直接就 create 没有get的过程 def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = { //这里直接蕴含了get / put两种操作了,scala好骚阿--【上锁】 assignedReplicaMap.getAndMaybePut(replicaId, { if (isReplicaLocal(replicaId)) { //本地副本,恢复/初始化 log,checkpoint,hw,等 val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)) val log = logManager.getOrCreateLog(topicPartition, config, isNew) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) val offsetMap = checkpoint.read() if (!offsetMap.contains(topicPartition)) info(s"No checkpointed highwatermark is found for partition $topicPartition") val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset) //上面都是在准备 初始值 new Replica(replicaId, topicPartition, time, offset, Some(log)) } else new Replica(replicaId, topicPartition, time) //不是本地副本,直接new一个, }) }

    2.2 副本角色切换

    makeLeader()就是角色切换的入口,对于这个方法传入参数有controllerId,易理解,correlationId关联id,最重要的参数:PartitionState里面包含了leader由谁担任,我们看一下该类的字段

    int leader, //leader副本所在的broker Id int leaderEpoch, List<Integer> isr,//ISR集合中所有副本所在的broker id int zkVersion, List<Integer> replicas, //AR集合所有。。。broker id

        makeLeader()方法会把PartitionState中的Leader字段内容设置为leader,他的返回值是boolean,表示leader是否发生了迁移,(可能新的leader还在旧的leader所在broker上面)一个Broker对应一个Kafka server.

    def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { //加锁 //获取需要分配的 AR集合 val allReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt) controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch //1. 获取ISR集合,并且创建replica 对象 val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet //2. 根据AR 跟新assignedReplicas集合 (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) //3. 更新partition字段 inSyncReplicas = newInSyncReplicas //更新ISR集合 leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch //4. 创建AR集合中所有replica 的对象 allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew)) zkVersion = partitionStateInfo.basePartitionState.zkVersion //5. 检测leader是否变化 val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) val leaderReplica = getReplica().get val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds (assignedReplicas - leaderReplica).foreach { replica => val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) } if (isNewLeader) { //6. 初始化new leader 的HW leaderReplica.convertHWToLocalOffsetMetadata() leaderReplicaIdOpt = Some(localBrokerId) //7.重置所有remote replica的leo值为-1 assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult)) } //8. 尝试更新 HW (maybeIncrementLeaderHW(leaderReplica), isNewLeader) } // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() isNewLeader }

    2.3 ISR集合管理

        当follower不断同步leader的record,那么follower的LEO就会逐渐后移,慢慢追上leader的LEO,此时follower就有了资格加入ISR,那么这里就需要对ISR集合进行扩张。

    def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { //获取 leader replica对象 leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark //获取当前hw //下面检测的三个条件依次是:follower副本不在isr集合中,ar集合可以查到follower,follower副本的leo追赶上了hw if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset.offsetDiff(leaderHW) >= 0) { //将follower副本计入ISR集合中, val newInSyncReplicas = inSyncReplicas + replica //更新 updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None => false // nothing to do if no longer leader } } }

    同时也有缩减ISR的方法,这里不再叙述

    2.4 追加消息

        对于一个Partition,只有leader parlica才有资格追加producer发来的消息,

    def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => val log = leaderReplica.log.get val minIsr = log.config.minInSyncReplicas //获取配置指定的最小ISR集合大小限制 val inSyncSize = inSyncReplicas.size //如果当前ISR集合size<规定大小,而生产者要求较高的可用性,则不能追加消息 if (inSyncSize < minIsr && requiredAcks == -1) {... } //写入副本对于的Log val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) //尝试增加 HW (info, maybeIncrementLeaderHW(leaderReplica)) ... } }

     

    Processed: 0.023, SQL: 9