flink on yarn启动流程分析

    技术2022-07-11  80

    本文主要分析提交一个flink on yarn的任务的流程,以job模式为例子

    CliFront作为提交命令行的入口一个命令行runJob的整体调用链路如下 run -> runProgram -> executeProgram -> ClusterClient.run

    其中yarn相关的流程就在runProgram方法中

    // 获取激活的customCommandLine final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine); try { runProgram(customCommandLine, commandLine, runOptions, program); } finally { program.deleteExtractedLibraries(); } private <T> void runProgram( CustomCommandLine<T> customCommandLine, CommandLine commandLine, RunOptions runOptions, PackagedProgram program) throws ProgramInvocationException, FlinkException { // 从customCommandLine获取到ClusterDescriptor final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); ... // 集群部署 client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); // 执行任务提交逻辑 executeProgram(program, client, userParallelism); .... }

    可以看到集群部署的逻辑是通过customCommandLine.createClusterDescriptor获取到的ClusterDescriptor来实现的。所以我们首先看一下获取customCommandLine的逻辑

    public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) { for (CustomCommandLine<?> cli : customCommandLines) { if (cli.isActive(commandLine)) { return cli; } } throw new IllegalStateException("No command-line ran."); }

    就是遍历所有的CustomCommandLine,然后选取被激活的头一个,看一下customCommandLines有哪些,初始化customCommandLines的逻辑就是在CliFrontend.loadCustomCommandLines方法

    public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2); // Command line interface of the YARN session, with a special initialization here // to prefix all options with y/yarn. // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the // active CustomCommandLine in order and DefaultCLI isActive always return true. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine(flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError | Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } customCommandLines.add(new DefaultCLI(configuration)); return customCommandLines; }

    实际上就两个customCommandLine,一个DefaultCLI,一个FlinkYarnSessionCli。我们直接看FlinkYarnSessionCli的isActive方法

    @Override public boolean isActive(CommandLine commandLine) { String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null); }

    可以看到其中一个条件是-m选项为 private static final String ID = "yarn-cluster";即可。

    我们看一下FlinkYarnSessionCli的createClusterDescriptor方法,最后返回的是一个YarnClusterDescriptor对象。所以我们直接看它的deployJobCluster方法,该方法最后调用的是AbstractYarnClusterDescriptor.startAppMaster

    public ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception { ... // 构建集群的启动方法 final ContainerLaunchContext amContainer = setupApplicationMasterContainer( yarnClusterEntrypoint, hasLogback, hasLog4j, hasKrb5, clusterSpecification.getMasterMemoryMB()); ... // set classpath from YARN configuration Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv); amContainer.setEnvironment(appMasterEnv); // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); capability.setMemory(clusterSpecification.getMasterMemoryMB()); capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES)); final String customApplicationName = customName != null ? customName : applicationName; appContext.setApplicationName(customApplicationName); appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink"); // 设置ApplicationMaster的启动方法 appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); if (yarnQueue != null) { appContext.setQueue(yarnQueue); } setApplicationNodeLabel(appContext); setApplicationTags(appContext); // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); LOG.info("Submitting application master " + appId); // 提交Application yarnClient.submitApplication(appContext); ... }

    startAppMaster也是一堆逻辑,我们直接看主要的,主要逻辑就是构建ContainerLaunchContext,然后将ContainerLaunchContext作为提交参数之一提交一个yarn app。看一下构造ContainerLaunchContext的方法逻辑

    protected ContainerLaunchContext setupApplicationMasterContainer( String yarnClusterEntrypoint, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, int jobManagerMemoryMb) { // ------------------ Prepare Application Master Container ------------------------------ // respect custom JVM options in the YAML file String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS); if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) { javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS); } //applicable only for YarnMiniCluster secure test run //krb5.conf file will be available as local resource in JM/TM container if (hasKrb5) { javaOpts += " -Djava.security.krb5.conf=krb5.conf"; } // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); final Map<String, String> startCommandValues = new HashMap<>(); startCommandValues.put("java", "$JAVA_HOME/bin/java"); int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration); String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize); startCommandValues.put("jvmmem", jvmHeapMem); startCommandValues.put("jvmopts", javaOpts); String logging = ""; if (hasLogback || hasLog4j) { logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\""; if (hasLogback) { logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME; } if (hasLog4j) { logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME; } } startCommandValues.put("logging", logging); // 将yarnClusterEntrypoint作为Container启动命令的入口类 startCommandValues.put("class", yarnClusterEntrypoint); startCommandValues.put("redirects", "1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " + "2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"); startCommandValues.put("args", ""); final String commandTemplate = flinkConfiguration .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); // 根据参数构建启动command final String amCommand = BootstrapTools.getStartCommand(commandTemplate, startCommandValues); // 设置command amContainer.setCommands(Collections.singletonList(amCommand)); LOG.debug("Application Master start command: " + amCommand); return amContainer; }

    最后我们看一下yarnClusterEntrypoint的来源,其实就是YarnSessionClusterEntrypoint(session模式)和YarnJobClusterEntrypoint(job模式)的两个类名。所以这个向yarn提交的AppMaster的逻辑实际上就是执行了YarnJobClusterEntrypoint/YarnSessionClusterEntrypoint的main方法。我们直接看YarnJobClusterEntrypoint的main方法。

    public static void main(String[] args) { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint( configuration, workingDirectory); // 启动逻辑 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); }

    最后调用了ClusterEntrypoint.runCluster方法

    private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { initializeServices(configuration); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); // 构建启动集群相关组件的factory final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); // 调用factory的create方法 clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } }

    DispatcherResourceManagerComponentFactory有两个具体子类, JobDispatcherResourceManagerComponentFactory和SessionDispatcherResourceManagerComponentFactory,分别代表了session和job模式。然后createDispatcherResourceManagerComponentFactory方法在各个ClusterEntrypoint的子类均有实现,实际差别就是构建的ResourceManagerFactory类有区别。即资源申请方式有区别。

    接下来看一下各个组件的构建

    public DispatcherResourceManagerComponent<T> create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { // 选主服务,为了HA LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; // webUI后端服务 WebMonitorEndpoint<U> webMonitorEndpoint = null; // 资源管理服务 ResourceManager<?> resourceManager = null; // metric服务 JobManagerMetricGroup jobManagerMetricGroup = null; // 对外交互的dispatcher T dispatcher = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L)); final ExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); final String hostname = getHostname(rpcService); jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); // 构建resourceManager resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), jobManagerMetricGroup); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); // 构建dispatcher dispatcher = dispatcherFactory.createDispatcher( configuration, rpcService, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServiceGatewayRpcAddress(), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist); log.debug("Starting ResourceManager."); resourceManager.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); log.debug("Starting Dispatcher."); dispatcher.start(); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return createDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); } catch (Exception exception) { // clean up all started components if (dispatcherLeaderRetrievalService != null) { try { dispatcherLeaderRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } if (resourceManagerRetrievalService != null) { try { resourceManagerRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); if (webMonitorEndpoint != null) { terminationFutures.add(webMonitorEndpoint.closeAsync()); } if (resourceManager != null) { terminationFutures.add(resourceManager.closeAsync()); } if (dispatcher != null) { terminationFutures.add(dispatcher.closeAsync()); } final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures); try { terminationFuture.get(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (jobManagerMetricGroup != null) { jobManagerMetricGroup.close(); } throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception); } }

    到此为止集群就已经启动完毕了,接下来我们回到deployJobCluster方法来看如何构建ClusterClient了

    protected ClusterClient<ApplicationId> deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception { // the Flink cluster is deployed in YARN. Represent cluster return createYarnClusterClient( this, validClusterSpecification.getNumberTaskManagers(), validClusterSpecification.getSlotsPerTaskManager(), report, flinkConfiguration, true); } @Override protected ClusterClient<ApplicationId> createYarnClusterClient( AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception { return new RestClusterClient<>( flinkConfiguration, report.getApplicationId()); } RestClusterClient( Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, @Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception { super(configuration); this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); if (restClient != null) { this.restClient = restClient; } else { // 构建RestClient this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService); } this.waitStrategy = Preconditions.checkNotNull(waitStrategy); this.clusterId = Preconditions.checkNotNull(clusterId); if (webMonitorRetrievalService == null) { this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever(); } else { this.webMonitorRetrievalService = webMonitorRetrievalService; } this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry")); startLeaderRetrievers(); }

    RestClient实际上就是一个基于netty实现的http client。到此为止集群以及能对该集群发起请求的client已经构造好了。接下来就是提交作业了 最后调用RestClusterClient.submitJob方法

    @Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached()); final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph); if (isDetached()) { try { return jobSubmissionFuture.get(); } catch (Exception e) { throw new ProgramInvocationException("Could not submit job", jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } } else { final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose( ignored -> requestJobResult(jobGraph.getJobID())); final JobResult jobResult; try { jobResult = jobResultFuture.get(); } catch (Exception e) { throw new ProgramInvocationException("Could not retrieve the execution result.", jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } try { this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } catch (IOException | ClassNotFoundException e) { throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } } }

    最后调用的url其实是到了JobSubmitHandler的处理逻辑里,如何最后调用的就是DispatcherGateway.submitJob方法了,接下来的逻辑就和上文类似了

    @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { final Collection<File> uploadedFiles = request.getUploadedFiles(); final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( File::getName, Path::fromLocalFile )); if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", nameToFile.size(), uploadedFiles.size()), HttpResponseStatus.BAD_REQUEST ); } final JobSubmitRequestBody requestBody = request.getRequestBody(); if (requestBody.jobGraphFileName == null) { throw new RestHandlerException( String.format("The %s field must not be omitted or be null.", JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH), HttpResponseStatus.BAD_REQUEST); } // 从request body里反序列化出JobGraph CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile); Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile); Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile); // 将job的资源文件上传blobServer CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration); // 调用dispatcher的submitJob方法 CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout)); return jobSubmissionFuture.thenCombine(jobGraphFuture, (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); }

    关于on k8s

    从yarn的流程我们也可以比较容易的迁移到k8s的流程,

    实现一份k8s的CustomCommandLine,用来构建返回k8s相关的集群createClusterDescriptor实现k8s的ClusterDescriptor,用于deploy集群实现基于k8s的ResourceManager

    k8s上大致要做的应该就是实现JobManager/TaskManger相应的operator

    Processed: 0.016, SQL: 9