Flink本地启动模式用户代码逻辑转换逻辑追踪

    技术2022-07-11  112

    Flink本地启动模式用户代码逻辑转换逻辑追踪

    本文主要是追踪了一下ide启动一个LocalStreamEnvironment的代码执行流程。

    测试代码取自flink的LocalStreamEnvironmentITCase类中,代码如下

    @Test public void testRunIsolatedJob() throws Exception { LocalStreamEnvironment env = new LocalStreamEnvironment(); assertEquals(1, env.getParallelism()); addSmallBoundedJob(env, 3); env.execute(); }

    ExecutionEnvironment部分

    首先是执行用户调用的DataStream中的各种map reduce agg等操作,实际上最后都是调用都是先构造相应逻辑的Transformation,然后调用调用了getExecutionEnvironment().addOperator()将Transformation加入到当前ExecutionEnvironment中

    以addSink为例

    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } // 构造相应的Transformation StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); // 将Transformation加入ExecutionEnvironment getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }

    接下来就是Transformation -> StreamGraph 该段逻辑在调用execute方法中被调用,主要逻辑在StreamGraphGenerator.generate方法

    public StreamGraph generate() { streamGraph = new StreamGraph(executionConfig, checkpointConfig); streamGraph.setStateBackend(stateBackend); streamGraph.setChaining(chaining); streamGraph.setScheduleMode(scheduleMode); streamGraph.setUserArtifacts(userArtifacts); streamGraph.setTimeCharacteristic(timeCharacteristic); streamGraph.setJobName(jobName); streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains); alreadyTransformed = new HashMap<>(); // 核心转换逻辑 for (Transformation<?> transformation: transformations) { transform(transformation); } final StreamGraph builtStreamGraph = streamGraph; alreadyTransformed.clear(); alreadyTransformed = null; streamGraph = null; return builtStreamGraph; }

    transform方法如下

    private Collection<Integer> transform(Transformation<?> transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from the ExecutionConfig. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection<Integer> transformedIds; // 根据各种类型的transform进行相应的处理 if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } else { streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) { if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) { throw new IllegalStateException("Auto generated UIDs have been disabled " + "but no UID or hash has been assigned to operator " + transform.getName()); } } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; }

    挑其中一个转换看一下,大致逻辑应该大同小异

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) { // 递归尝试去转换上层的input的Transformation Collection<Integer> inputIds = transform(transform.getInput()); // the recursive call might have already transformed this if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); // 用户逻辑的转换主要就在这一块了核心就是getOperatorFactory streamGraph.addOperator(transform.getId(), slotSharingGroup, transform.getCoLocationGroupKey(), transform.getOperatorFactory(), transform.getInputType(), transform.getOutputType(), transform.getName()); if (transform.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transform.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); for (Integer inputId: inputIds) { streamGraph.addEdge(inputId, transform.getId(), 0); } return Collections.singleton(transform.getId()); }

    StreamOperator和StreamOperatorFactory

    上面说了,核心是getOperatorFactory方法,这个方法返回的是一个StreamOperatorFactory对象,StreamOperatorFactory是一个接口,其中最重要的方法就是

    /** * Create the operator. Sets access to the context and the output. */ <T extends StreamOperator<OUT>> T createStreamOperator( StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);

    实现了该方法的子类有两个,SimpleOperatorFactory和CodeGenOperatorFactory,然后回到DataStream的方法里,DataStream的API里涉及的都是SimpleOperatorFactory,CodeGenOperatorFactory应该主要是用于sql相关的(猜测还没看)。

    然后接下来就是StreamOperator,这个就代表了各种类型的计算逻辑,子类很多,就不一一列举了,总之记住目前StreamOperatorFactory就是代表了用户的计算逻辑,我们看代码的时候也是跟着这条链路去看

    好,回到StreamGraph.addOperator方法

    public <IN, OUT> void addOperator( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { // 这里就是用户逻辑转换的地方,addNode,构造一个StreamNode对象,所以我们的用户逻辑也就是最后在StreamGraph中的StreamNode列表中了 if (operatorFactory.isStreamSource()) { addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName); } else { addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName); } TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null; TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null; setSerializers(vertexID, inSerializer, null, outSerializer); if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) { // sets the output type which must be know at StreamGraph creation time operatorFactory.setOutputType(outTypeInfo, executionConfig); } if (operatorFactory.isInputTypeConfigurable()) { operatorFactory.setInputType(inTypeInfo, executionConfig); } if (LOG.isDebugEnabled()) { LOG.debug("Vertex: {}", vertexID); } }

    经过上述步骤,用户逻辑目前现在已经被转换到StreamGraph中了,接下来就是StreamGraph -> JobGraph

    逻辑入口是StreamGraph.getJobGraph方法

    @Override public JobGraph getJobGraph(@Nullable JobID jobID) { // temporarily forbid checkpointing for iterative jobs if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) { throw new UnsupportedOperationException( "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } // 转换逻辑核心 return StreamingJobGraphGenerator.createJobGraph(this, jobID); } public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) { return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); }

    所以最后逻辑就落在了StreamJobGraphGenerator.createJobGraph()上

    private JobGraph createJobGraph() { // make sure that all vertices start immediately jobGraph.setScheduleMode(streamGraph.getScheduleMode()); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // Generate legacy version hashes for backwards compatibility List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); // 核心设置用户逻辑的地方 setChaining(hashes, legacyHashes, chainedOperatorHashes); setPhysicalEdges(); setSlotSharingAndCoLocation(); configureCheckpointing(); JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph); // set the ExecutionConfig last when it has been finalized try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } return jobGraph; }

    StreamGraph转JobGraph里面用户逻辑的转换是真的比较隐晦,直接看代码半天没看出来,最后还是看了执行时从哪里取的用户逻辑数据才找到的,我们先看一下setChaining方法。

    /** * Sets up task chains from the source {@link StreamNode} instances. * * <p>This will recursively create all {@link JobVertex} instances. */ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { // 循环调用createChain createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); } } private List<StreamEdge> createChain( Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, int chainIndex, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { if (!builtVertices.contains(startNodeId)) { List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); StreamNode currentNode = streamGraph.getStreamNode(currentNodeId); for (StreamEdge outEdge : currentNode.getOutEdges()) { if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); } for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); } List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); byte[] primaryHashBytes = hashes.get(currentNodeId); OperatorID currentOperatorId = new OperatorID(primaryHashBytes); for (Map<Integer, byte[]> legacyHash : legacyHashes) { operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); } chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); if (currentNode.getInputFormat() != null) { getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat()); } if (currentNode.getOutputFormat() != null) { getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); } // 如果当前节点是chain的起始节点,则create一个JobVertex,并返回该JobVertex的config,所以对于该StreamConfig的修改就是对于其对应的JobVertex的config的修改 StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration()); // 这里个方法里将用户逻辑(StreamOperatorFactory)序列化设置到StreamConfig中 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); if (currentNodeId.equals(startNodeId)) { config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); for (StreamEdge edge : transitiveOutEdges) { connect(startNodeId, edge); } // 如果是chain的起始节点,则把所有该chain节点的chainedConfigs里对应的配置全部取出来,并序列化设置到StreamConfig config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { // 如果是被chain的节点,尝试构建该节点的start节点的chainedConfigs chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>()); config.setChainIndex(chainIndex); StreamNode node = streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); // 在start节点的chainedConfigs里加入该StreamConfig chainedConfigs.get(startNodeId).put(currentNodeId, config); } config.setOperatorID(currentOperatorId); if (chainableOutputs.isEmpty()) { config.setChainEnd(); } return transitiveOutEdges; } else { return new ArrayList<>(); } }

    上面的代码就完成了用户代码逻辑的转换,用户逻辑(StreamOperatorFactory)被序列化写入到了JobGraph的各JobVertex的config配置里了,具体的key可以看一下StreamConfig的setStreamOperatorFactory和setTransitiveChainedTaskConfigs方法,前者是设置StreamOperatorFactory的,后者是设置JobVertex的chainedConfig的,chainedConfig中包含了chain头节点之外其他Operator节点的逻辑

    构建出了JobGraph接下来就是启动相关的JobManager和TaskManger组件,并且向JobManager提交JobGraph了

    LocalStreamEnvironment的execute方法中,最后创建了一个MiniCluster,并调用了miniCluster.executeJobBlocking方法

    JobManager部分

    最后调用的是MiniCluster的submitJob方法,大致步骤是获取DispatcherGateway,然后启动BlobServer用于上传jar包,最后调用DispatcherGateway的submitJob方法

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { // 获取DispatcherGateway final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); // we have to allow queued scheduling in Flip-6 mode because we need to request slots // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); // 上传jar final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); // 提交 final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); }

    最后调用的是Dispatcher.persistAndRunJob方法

    private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception { submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph)); // 启动逻辑方法 final CompletableFuture<Void> runJobFuture = runJob(jobGraph); return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> { if (throwable != null) { submittedJobGraphStore.removeJobGraph(jobGraph.getJobID()); } })); } private CompletableFuture<Void> runJob(JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID())); // 这一步已经生成了ExecutionGraph了,用户逻辑此时还是在ExecutionJobVerte的JobVertex里,这一步只是加了并发度以及调度相关的内容,所以暂时忽略转换逻辑 final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph); jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); // 启动JobManager return jobManagerRunnerFuture .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)) .thenApply(FunctionUtils.nullFn()) .whenCompleteAsync( (ignored, throwable) -> { if (throwable != null) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); }

    我们看一下JobManager的启动流程

    private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobGraph().getJobID(); // 注册JobManager完成后的动作 FutureUtils.assertNoException( jobManagerRunner.getResultFuture().handleAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { // check if we are still the active JobManagerRunner by checking the identity final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); final JobManagerRunner currentJobManagerRunner = jobManagerRunnerFuture != null ? jobManagerRunnerFuture.getNow(null) : null; //noinspection ObjectEquality if (jobManagerRunner == currentJobManagerRunner) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}.", jobId); } return null; }, getMainThreadExecutor())); // 启动JobManager jobManagerRunner.start(); return jobManagerRunner; }

    JobManager的启动其实就是将自己作为一个node加入leader选举

    public void start() throws Exception { try { // leader选举 leaderElectionService.start(this); } catch (Exception e) { log.error("Could not start the JobManager because the leader election service did not start.", e); throw new Exception("Could not start the leader election service.", e); } }

    所以我们直接看一下当JobManager被选举成leader的动作

    @Override public void grantLeadership(final UUID leaderSessionID) { synchronized (lock) { if (shutdown) { log.info("JobManagerRunner already shutdown."); return; } leadershipOperation = leadershipOperation.thenCompose( (ignored) -> { synchronized (lock) { // 当被选为leader后尝试调度并启动状态 return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID); } }); handleException(leadershipOperation, "Could not start the job manager."); } } private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) { final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus(); return jobSchedulingStatusFuture.thenCompose( jobSchedulingStatus -> { // check调度状态 if (jobSchedulingStatus == JobSchedulingStatus.DONE) { return jobAlreadyDone(); } else { // 调度并启动job return startJobMaster(leaderSessionId); } }); }

    最后调用启JobMaster的start方法

    private CompletionStage<Void> startJobMaster(UUID leaderSessionId) { log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress()); try { runningJobsRegistry.setJobRunning(jobGraph.getJobID()); } catch (IOException e) { return FutureUtils.completedExceptionally( new FlinkException( String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()), e)); } final CompletableFuture<Acknowledge> startFuture; try { // 启动逻辑 startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); } catch (Exception e) { return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e)); } final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture; return startFuture.thenAcceptAsync( (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture), executor); }

    我们看一下JobMaster的启动逻辑,最后调用到JobMaster.startJobExecution

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { validateRunsInMainThread(); checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); if (Objects.equals(getFencingToken(), newJobMasterId)) { log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); return Acknowledge.get(); } setNewFencingToken(newJobMasterId); // 启动一和链接一些rpc服务 startJobMasterServices(); log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId); // 调度并启动Task resetAndStartScheduler(); return Acknowledge.get(); }

    可以看到,具体资源申请和启动逻辑就在resetAndStartScheduler方法了

    private void resetAndStartScheduler() throws Exception { validateRunsInMainThread(); final CompletableFuture<Void> schedulerAssignedFuture; // 初始化/重置调度信息 if (schedulerNG.requestJobStatus() == JobStatus.CREATED) { schedulerAssignedFuture = CompletableFuture.completedFuture(null); schedulerNG.setMainThreadExecutor(getMainThreadExecutor()); } else { suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled.")); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup); schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle( (ignored, throwable) -> { newScheduler.setMainThreadExecutor(getMainThreadExecutor()); assignScheduler(newScheduler, newJobManagerJobMetricGroup); return null; } ); } // 开始调度 schedulerAssignedFuture.thenRun(this::startScheduling); } private void startScheduling() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); schedulerNG.registerJobStatusListener(jobStatusListener); schedulerNG.startScheduling(); } @Override public void startScheduling() { mainThreadExecutor.assertRunningInMainThread(); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } } public void scheduleForExecution() throws JobException { assertRunningInJobMasterMainThread(); final long currentGlobalModVersion = globalModVersion; if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { // 实际调度逻辑 final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule( scheduleMode, getAllExecutionVertices(), this); if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { schedulingFuture = newSchedulingFuture; newSchedulingFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (!(strippedThrowable instanceof CancellationException)) { // only fail if the scheduling future was not canceled failGlobal(strippedThrowable); } } }); } else { newSchedulingFuture.cancel(false); } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); } }

    可以看到最后调度逻辑由SchedulingUtils.schedule方法实现

    public static CompletableFuture<Void> schedule( ScheduleMode scheduleMode, final Iterable<ExecutionVertex> vertices, final ExecutionGraph executionGraph) { switch (scheduleMode) { case LAZY_FROM_SOURCES: case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST: return scheduleLazy(vertices, executionGraph); case EAGER: // 流常用调度逻辑 return scheduleEager(vertices, executionGraph); default: throw new IllegalStateException(String.format("Schedule mode %s is invalid.", scheduleMode)); } }

    上面两种调度逻辑是lazy模式,需要时才调度启动第二个TaskManager,主要用于批模式,暂时忽略,直接看EAGER模式的调度逻辑

    主要逻辑就是

    申请所有task的资源资源申请下来后调用deploy方法将用户逻辑部署到相应的资源上跑 public static CompletableFuture<Void> scheduleEager( final Iterable<ExecutionVertex> vertices, final ExecutionGraph executionGraph) { executionGraph.assertRunningInJobMasterMainThread(); checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently"); // Important: reserve all the space we need up front. // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(); final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy(); final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet( computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider())); // 申请所有的task所需的资源 // allocate the slots (obtain all their futures) for (ExecutionVertex ev : vertices) { // these calls are not blocking, they only return futures CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution( slotProviderStrategy, LocationPreferenceConstraint.ALL, allPreviousAllocationIds); allAllocationFutures.add(allocationFuture); } // this future is complete once all slot futures are complete. // the future fails once one slot future fails. final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); // 所有资源申请下来后调用deploy方法 return allAllocationsFuture.thenAccept( (Collection<Execution> executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { try { execution.deploy(); } catch (Throwable t) { throw new CompletionException( new FlinkException( String.format("Could not deploy execution %s.", execution), t)); } } }) // Generate a more specific failure message for the eager scheduling .exceptionally( (Throwable throwable) -> { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); final Throwable resultThrowable; if (strippedThrowable instanceof TimeoutException) { int numTotal = allAllocationsFuture.getNumFuturesTotal(); int numComplete = allAllocationsFuture.getNumFuturesCompleted(); String message = "Could not allocate all requires slots within timeout of " + executionGraph.getAllocationTimeout() + ". Slots required: " + numTotal + ", slots allocated: " + numComplete + ", previous allocation IDs: " + allPreviousAllocationIds; StringBuilder executionMessageBuilder = new StringBuilder(); for (int i = 0; i < allAllocationFutures.size(); i++) { CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i); try { Execution execution = executionFuture.getNow(null); if (execution != null) { executionMessageBuilder.append("completed: " + execution); } else { executionMessageBuilder.append("incomplete: " + executionFuture); } } catch (CompletionException completionException) { executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture); } if (i < allAllocationFutures.size() - 1) { executionMessageBuilder.append(", "); } } message += ", execution status: " + executionMessageBuilder.toString(); resultThrowable = new NoResourceAvailableException(message); } else { resultThrowable = strippedThrowable; } throw new CompletionException(resultThrowable); }); }

    资源申请部分我们以后有机会再涉及,我们先看一下用户代码的部署

    public void deploy() throws JobException { ... try { // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING.")); return; } if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation())); } // 构建deploy请求体 final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory .fromExecutionVertex(vertex, attemptNumber) .createDeploymentDescriptor( slot.getAllocationId(), slot.getPhysicalSlotNumber(), taskRestore, producedPartitions.values()); // null taskRestore to let it be GC'ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor(); // We run the submission in the future executor so that the serialization of large TDDs does not block // the main thread and sync back to the main thread once submission is completed. // 调用rpc方法向TaskManager提交task CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor) .thenCompose(Function.identity()) .whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, jobMasterMainThreadExecutor); } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); } }

    TaskManager部分

    最后的逻辑是调用了TaskExecutor.submitTask的方法

    @Override public CompletableFuture<Acknowledge> submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { try { final JobID jobId = tdd.getJobId(); final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); if (jobManagerConnection == null) { final String message = "Could not submit task because there is no JobManager " + "associated for the job " + jobId + '.'; log.debug(message); throw new TaskSubmissionException(message); } if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) { final String message = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + jobManagerConnection.getJobMasterId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) { final String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + tdd.getAllocationId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } // re-integrate offloaded data: try { tdd.loadBigData(blobCacheService.getPermanentBlobService()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e); } // deserialize the pre-serialized information // 从tdd中反序列化出jobInformation和TaskInformation, 其中用户逻辑就包含在taskInformation中 final JobInformation jobInformation; final TaskInformation taskInformation; try { jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException("Could not deserialize the job or task information.", e); } ... // 构建Task,Task继承了Runnable,这个就是用户代码实际运行线程 Task task = new Task( jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), taskExecutorServices.getMemoryManager(), taskExecutorServices.getIOManager(), taskExecutorServices.getShuffleEnvironment(), taskExecutorServices.getKvStateService(), taskExecutorServices.getBroadcastVariableManager(), taskExecutorServices.getTaskEventDispatcher(), taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, aggregateManager, blobCacheService, libraryCache, fileCache, taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, getRpcService().getExecutor()); log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); boolean taskAdded; try { taskAdded = taskSlotTable.addTask(task); } catch (SlotNotFoundException | SlotNotActiveException e) { throw new TaskSubmissionException("Could not submit task.", e); } if (taskAdded) { // 启动task task.startTaskThread(); taskCompletionTracker.trackTaskCompletion(task); setupResultPartitionBookkeeping( tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture()); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager already contains a task for id " + task.getExecutionId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } } catch (TaskSubmissionException e) { return FutureUtils.completedExceptionally(e); } }

    最后直接看Task的run的具体逻辑,task的整个执行逻辑有点长,我们主要看几个关键点

    private void doRun() { // 这一步开始构建了用户代码的执行层,即invokable // Make sure the user code classloader is accessible thread-locally. // We are setting the correct context class loader before instantiating the invokable // so that it is available to the invokable during its entire lifetime. executingThread.setContextClassLoader(userCodeClassLoader); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // 实际运行代码 // run the invokable invokable.invoke(); }

    invokable是AbstractInvokable及其子类,我们看一下其中最常用的StreamTask的实现,该类最后调用的是其中的```StreamTask.run``方法

    private void run() throws Exception { final ActionContext actionContext = new ActionContext(); while (true) { // flink的mailbox逻辑,即如果存在需要执行的runnable就执行一下,将整个StreamTask的执行变成单线程+消息模式了,详细逻辑以后有机会再说 if (mailbox.hasMail()) { Optional<Runnable> maybeLetter; while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) { Runnable letter = maybeLetter.get(); if (letter == POISON_LETTER) { return; } letter.run(); } } // 处理逻辑 processInput(actionContext); } } protected void processInput(ActionContext context) throws Exception { // 处理逻辑 if (!inputProcessor.processInput()) { // 清理mailbox里的消息 context.allActionsCompleted(); } }

    inputProcessor是StreamInputProcessor的子类,我们看StreamOneInputProcessor这个子类的实现

    public boolean processInput() throws Exception { initializeNumRecordsIn(); // 获取数据,具体实现先忽略 StreamElement recordOrMark = input.pollNextNullable(); if (recordOrMark == null) { input.isAvailable().get(); return !checkFinished(); } int channel = input.getLastChannel(); checkState(channel != StreamTaskInput.UNSPECIFIED); // 处理数据 processElement(recordOrMark, channel); return true; } private void processElement(StreamElement recordOrMark, int channel) throws Exception { // 如果是业务数据,调用处理逻辑 if (recordOrMark.isRecord()) { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } } // 如果是watermark之类的数据在其它情况下处理 else if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), channel); } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), channel); } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } }

    最后就是调用了streamOperator的processElement处理数据,这个streamOperator代表的是整个OperatorChain的headOpeartor,接下来我们主要看一下数据怎么在整个OperatorChain内部流动,我们以一个StreamOperator的子类StreamMap为例

    @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }

    最后调用output.collect将数据传递给下一个StreamOperator

    @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (this.outputTag == null || !this.outputTag.equals(outputTag)) { // we are only responsible for emitting to the side-output specified by our // OutputTag. return; } pushToOperator(record); } protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); // 下一个operator进行处理,然后下个Operator中也有类似的Output来将数据Collect给下游的operator operator.setKeyContextElement1(castRecord); operator.processElement(castRecord); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } }
    Processed: 0.010, SQL: 9