两个更新HW,LEO的方法
def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { //只有本地副本才能更新 HW highWatermarkMetadata = newHighWatermark } else {.... } } private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { //对于本地副本,不能直接更新LEO,LEO由log.LEO字段决定... } else { //远程副本,直接更新 logEndOffsetMetadata = newLogEndOffset } }2.1 创建副本
//get / create replica 这里似乎直接就 create 没有get的过程 def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = { //这里直接蕴含了get / put两种操作了,scala好骚阿--【上锁】 assignedReplicaMap.getAndMaybePut(replicaId, { if (isReplicaLocal(replicaId)) { //本地副本,恢复/初始化 log,checkpoint,hw,等 val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)) val log = logManager.getOrCreateLog(topicPartition, config, isNew) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) val offsetMap = checkpoint.read() if (!offsetMap.contains(topicPartition)) info(s"No checkpointed highwatermark is found for partition $topicPartition") val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset) //上面都是在准备 初始值 new Replica(replicaId, topicPartition, time, offset, Some(log)) } else new Replica(replicaId, topicPartition, time) //不是本地副本,直接new一个, }) }2.2 副本角色切换
makeLeader()就是角色切换的入口,对于这个方法传入参数有controllerId,易理解,correlationId关联id,最重要的参数:PartitionState里面包含了leader由谁担任,我们看一下该类的字段
int leader, //leader副本所在的broker Id int leaderEpoch, List<Integer> isr,//ISR集合中所有副本所在的broker id int zkVersion, List<Integer> replicas, //AR集合所有。。。broker idmakeLeader()方法会把PartitionState中的Leader字段内容设置为leader,他的返回值是boolean,表示leader是否发生了迁移,(可能新的leader还在旧的leader所在broker上面)一个Broker对应一个Kafka server.
def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { //加锁 //获取需要分配的 AR集合 val allReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt) controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch //1. 获取ISR集合,并且创建replica 对象 val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet //2. 根据AR 跟新assignedReplicas集合 (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) //3. 更新partition字段 inSyncReplicas = newInSyncReplicas //更新ISR集合 leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch //4. 创建AR集合中所有replica 的对象 allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew)) zkVersion = partitionStateInfo.basePartitionState.zkVersion //5. 检测leader是否变化 val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) val leaderReplica = getReplica().get val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds (assignedReplicas - leaderReplica).foreach { replica => val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) } if (isNewLeader) { //6. 初始化new leader 的HW leaderReplica.convertHWToLocalOffsetMetadata() leaderReplicaIdOpt = Some(localBrokerId) //7.重置所有remote replica的leo值为-1 assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult)) } //8. 尝试更新 HW (maybeIncrementLeaderHW(leaderReplica), isNewLeader) } // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() isNewLeader }2.3 ISR集合管理
当follower不断同步leader的record,那么follower的LEO就会逐渐后移,慢慢追上leader的LEO,此时follower就有了资格加入ISR,那么这里就需要对ISR集合进行扩张。
def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { //获取 leader replica对象 leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark //获取当前hw //下面检测的三个条件依次是:follower副本不在isr集合中,ar集合可以查到follower,follower副本的leo追赶上了hw if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset.offsetDiff(leaderHW) >= 0) { //将follower副本计入ISR集合中, val newInSyncReplicas = inSyncReplicas + replica //更新 updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None => false // nothing to do if no longer leader } } }同时也有缩减ISR的方法,这里不再叙述
2.4 追加消息
对于一个Partition,只有leader parlica才有资格追加producer发来的消息,
def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => val log = leaderReplica.log.get val minIsr = log.config.minInSyncReplicas //获取配置指定的最小ISR集合大小限制 val inSyncSize = inSyncReplicas.size //如果当前ISR集合size<规定大小,而生产者要求较高的可用性,则不能追加消息 if (inSyncSize < minIsr && requiredAcks == -1) {... } //写入副本对于的Log val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) //尝试增加 HW (info, maybeIncrementLeaderHW(leaderReplica)) ... } }