HDFS源码解析之验证NameNode是不是RPC的服务端(二)

    技术2025-06-15  18

    一、环境准备以及源码导入

    通过我们上一个篇章的学习,我们已经可以简单的知道我们HDFS是如何远程通信的,那么我们接下来就可以进入阅读源码了

    我们采用的是我们的场景驱动的方式所以我们接下来,只看NameNode的启动流程,这时我们应该从哪里入手呢,阅读源码我们需要找到程序的入口是吧,那么这个时候,NameNode的入口是什么呢,上一篇博客已经讲解了我们的Hadoop RPC 那么,我们是否还记得,在我们安装NameNode的时候肯定会使用JPS来看看我们的NameNode进程是否启动起来,这个时候我们是否可以猜测一下,我们NameNode其实就是我们的服务端,那么接下来我们就要验证一下我们的NameNode是不是我们的服务端,这个过程是不是就是我们NameNode的启动流程,接下来我们进行阅读源码

    1.1 环境准备

    版本选择,我选择的是hadoop2.7下载源码

    **

    导入IDEA

    二、阅读源码

    2.1 NameNode类的注释

    我们阅读源码的时候一定要阅读重要类的注释

    /** * NameNode serves as both directory namespace manager and * "inode table" for the Hadoop DFS. There is a single NameNode * running in any DFS deployment. (Well, except when there * is a second backup/failover NameNode, or when using federated NameNodes.) * * The NameNode controls two critical tables: * 1) filename->blocksequence (namespace) * 2) block->machinelist ("inodes") * * The first table is stored on disk and is very precious. * The second table is rebuilt every time the NameNode comes up. * * 'NameNode' refers to both this class as well as the 'NameNode server'. * The 'FSNamesystem' class actually performs most of the filesystem * management. The majority of the 'NameNode' class itself is concerned * with exposing the IPC interface and the HTTP server to the outside world, * plus some configuration management. */

    我们来翻译一下

    NameNode服务即管理了HDFS集群的元数据和 "inode table" 一个HDFS集群里面只能有一个NameNode(除了HA 和联邦机制) NameNode管理了两张比较重要的表: 1) 一张表管理了文件与block之间的关系 2) 另一张表管理了block文件块与DatabNode主机之间的关系 第一张表非常珍贵,存储到了磁盘上面(因为文件与block块之间的关系是不会发生变化的,属于静态数据) 第二张表示NameNode重启的时候重新构建出来的 NameNode服务是由三个重要的类来进行支撑的: 1) NameNode类: 管理配置的参数 2) NameNode server: IPC Server: NameNodeRpcServer:开放端口,等待别人调用.比如:8020/9000 HTTP Server: nameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况 3) FSNameSystem: 这个类非常重要,管理了HDFS的元数据

    2.2 NameNode的main方法

    public static void main(String argv[]) throws Exception { //解析参数 if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) { //参数异常退出 System.exit(0); } try { StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); //TODO 创建NameNode的核心代码,名字就是创建NameNode NameNode namenode = createNameNode(argv, null); if (namenode != null) { //让线程阻塞在这,这也就是为什么我们jps命令的时候能一直看到我们的NameNode进程 namenode.join(); } } catch (Throwable e) { LOG.error("Failed to start namenode.", e); terminate(1, e); } }

    2.3 NameNode的createNameNode方法

    public static NameNode createNameNode(String argv[], Configuration conf) throws IOException { LOG.info("createNameNode " + Arrays.asList(argv)); if (conf == null) conf = new HdfsConfiguration(); /** * 我们操作HDFS集群的时候回传进来如下的参数: * hdfs namenode -format * * hadoop-daemon.sh start namenode */ StartupOption startOpt = parseArguments(argv); if (startOpt == null) { printUsage(System.err); return null; } setStartupOption(conf, startOpt); //判断操作 switch (startOpt) { case FORMAT: { boolean aborted = format(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid javac warning } case GENCLUSTERID: { System.err.println("Generating new cluster id:"); System.out.println(NNStorage.newClusterID()); terminate(0); return null; } case FINALIZE: { System.err.println("Use of the argument '" + StartupOption.FINALIZE + "' is no longer supported. To finalize an upgrade, start the NN " + " and then run `hdfs dfsadmin -finalizeUpgrade'"); terminate(1); return null; // avoid javac warning } case ROLLBACK: { boolean aborted = doRollback(conf, true); terminate(aborted ? 1 : 0); return null; // avoid warning } case BOOTSTRAPSTANDBY: { String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length); int rc = BootstrapStandby.run(toolArgs, conf); terminate(rc); return null; // avoid warning } case INITIALIZESHAREDEDITS: { boolean aborted = initializeSharedEdits(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid warning } case BACKUP: case CHECKPOINT: { NamenodeRole role = startOpt.toNodeRole(); DefaultMetricsSystem.initialize(role.toString().replace(" ", "")); return new BackupNode(conf, role); } case RECOVER: { NameNode.doRecovery(startOpt, conf); return null; } case METADATAVERSION: { printMetadataVersion(conf); terminate(0); return null; // avoid javac warning } case UPGRADEONLY: { DefaultMetricsSystem.initialize("NameNode"); new NameNode(conf); terminate(0); return null; } //没有start 所以执行的是这个 default: { DefaultMetricsSystem.initialize("NameNode"); //NameNode 初始化流程 return new NameNode(conf); } } }

    2.4 NameNode的构造方法

    protected NameNode(Configuration conf, NamenodeRole role) throws IOException { this.conf = conf; this.role = role; setClientNamenodeAddress(conf); String nsId = getNameServiceId(conf); String namenodeId = HAUtil.getNameNodeId(conf, nsId); this.haEnabled = HAUtil.isHAEnabled(conf, nsId); state = createHAState(getStartupOption(conf)); this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf); this.haContext = createHAContext(); try { initializeGenericKeys(conf, nsId, namenodeId); //我们去分析源码的时候,这样的关键方法我们一定要留意 //TODO 初始化的方法 initialize(conf); try { haContext.writeLock(); state.prepareToEnterState(haContext); state.enterState(haContext); } finally { haContext.writeUnlock(); } } catch (IOException e) { this.stop(); throw e; } catch (HadoopIllegalArgumentException e) { this.stop(); throw e; } this.started.set(true); }

    2.5 NameNode构造器中调用的initialize方法

    /** * Initialize name-node. * * @param conf the configuration */ protected void initialize(Configuration conf) throws IOException { //一看就不是核心代码 ... /** * Namenode的启动流程 * 服务端: * RPCServer 9000/8020 * HttpServer 50070 */ if (NamenodeRole.NAMENODE == role) { //TODO 启动HTTPServer startHttpServer(conf); } //加载元数据 loadNamesystem(conf); //创建NameNode rpcServer = createRpcServer(conf); .... }

    2.6 NameNode中startHttpServer中的调用start方法

    private void startHttpServer(final Configuration conf) throws IOException { httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); //重点看启动方法 httpServer.start(); httpServer.setStartupProgress(startupProgress); } //启动HttpServer void start() throws IOException { HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); final String infoHost = bindAddress.getHostName(); final InetSocketAddress httpAddr = bindAddress; final String httpsAddrString = conf.getTrimmed( DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT); InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString); if (httpsAddr != null) { // If DFS_NAMENODE_HTTPS_BIND_HOST_KEY exists then it overrides the // host name portion of DFS_NAMENODE_HTTPS_ADDRESS_KEY. final String bindHost = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY); if (bindHost != null && !bindHost.isEmpty()) { httpsAddr = new InetSocketAddress(bindHost, httpsAddr.getPort()); } } /** * Hadoop喜欢自己封装东西,举个例子,本来就有RPC的服务.但是NameNode自己封装了一个HdoopRPC * 这个地方也是类似,本来就有HTTPSerber,Hadoop经过封装了HttpServer2服务 */ HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "hdfs", DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); httpServer = builder.build(); if (policy.isHttpsEnabled()) { // assume same ssl port for all datanodes InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.getTrimmed( DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT)); httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, datanodeSslPort.getPort()); } initWebHdfs(conf); httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); /** * TODO 绑定了一堆的servlet * servlet越多,支持的功能就越多 */ setupServlets(httpServer, conf); //TODO 启动httpServer服务,对外开放50070端口 httpServer.start(); int connIdx = 0; if (policy.isHttpEnabled()) { httpAddress = httpServer.getConnectorAddress(connIdx++); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, NetUtils.getHostPortString(httpAddress)); } if (policy.isHttpsEnabled()) { httpsAddress = httpServer.getConnectorAddress(connIdx); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, NetUtils.getHostPortString(httpsAddress)); } }

    2.7 HttpServer 绑定了很多servlet(其实httpserver也是一个rpc)

    private static void setupServlets(HttpServer2 httpServer, Configuration conf) { httpServer.addInternalServlet("startupProgress", StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class); httpServer.addInternalServlet("getDelegationToken", GetDelegationTokenServlet.PATH_SPEC, GetDelegationTokenServlet.class, true); httpServer.addInternalServlet("renewDelegationToken", RenewDelegationTokenServlet.PATH_SPEC, RenewDelegationTokenServlet.class, true); httpServer.addInternalServlet("cancelDelegationToken", CancelDelegationTokenServlet.PATH_SPEC, CancelDelegationTokenServlet.class, true); httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true); //TODO 上传元数据的请求 // SecondaryNameNode/NameNode(standby) 合并出来的FSImage需要替换Active NameNodeDE fsimage // 发送的就是http请求,请求会转发给这个servlet httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, ImageServlet.class, true); //TODO 我们可以在50070界面上浏览目录信息,就是因为这儿有个servlet httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class, false); httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class, false); httpServer.addInternalServlet("checksum", "/fileChecksum/*", FileChecksumServlets.RedirectServlet.class, false); httpServer.addInternalServlet("contentSummary", "/contentSummary/*", ContentSummaryServlet.class, false); }

    这个时候HttpServer2就启动起来了

    2.8 NameNode中createRpcServer方法

    /** * Create the RPC server implementation. Used as an extension point for the * BackupNode. */ protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { //实例化我们的NameNodeRpcServer return new NameNodeRpcServer(conf, this); }

    2.9 NameNodeRpcServer的构造方法

    public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { this.nn = nn; this.namesystem = nn.getNamesystem(); this.retryCache = namesystem.getRetryCache(); this.metrics = NameNode.getNameNodeMetrics(); int handlerCount = conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, DFS_NAMENODE_HANDLER_COUNT_DEFAULT); RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); //TODO 创建了一堆服务 其实就是PRC中的协议 ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = new DatanodeProtocolServerSideTranslatorPB(this); BlockingService dnProtoPbService = DatanodeProtocolService .newReflectiveBlockingService(dnProtoPbTranslator); NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); BlockingService NNPbService = NamenodeProtocolService .newReflectiveBlockingService(namenodeProtocolXlator); RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator = new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this); BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService .newReflectiveBlockingService(refreshAuthPolicyXlator); RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = new RefreshUserMappingsProtocolServerSideTranslatorPB(this); BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService .newReflectiveBlockingService(refreshUserMappingXlator); RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = new RefreshCallQueueProtocolServerSideTranslatorPB(this); BlockingService refreshCallQueueService = RefreshCallQueueProtocolService .newReflectiveBlockingService(refreshCallQueueXlator); GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator = new GenericRefreshProtocolServerSideTranslatorPB(this); BlockingService genericRefreshService = GenericRefreshProtocolService .newReflectiveBlockingService(genericRefreshXlator); GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(this); BlockingService getUserMappingService = GetUserMappingsProtocolService .newReflectiveBlockingService(getUserMappingXlator); HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = new HAServiceProtocolServerSideTranslatorPB(this); BlockingService haPbService = HAServiceProtocolService .newReflectiveBlockingService(haServiceProtocolXlator); TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); WritableRpcEngine.ensureInitialized(); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) { String bindHost = nn.getServiceRpcServerBindHost(conf); if (bindHost == null) { bindHost = serviceRpcAddr.getHostName(); } LOG.info("Service RPC server is binding to " + bindHost + ":" + serviceRpcAddr.getPort()); int serviceHandlerCount = conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); // TODO 启动ServiceRPCServer // 这个服务起来是用来监听DataNode发送过来的请求的 // JPS namenode 服务 this.serviceRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, refreshAuthService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, serviceRpcServer); // We support Refreshing call queue here in case the client RPC queue is full DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, serviceRpcServer); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, serviceRpcServer); // Update the address with the correct port InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress(); serviceRPCAddress = new InetSocketAddress( serviceRpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServiceServerAddress(conf, serviceRPCAddress); } else { serviceRpcServer = null; serviceRPCAddress = null; } InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) { bindHost = rpcAddr.getHostName(); } LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort()); // TODO clientRpcServer // 这个是客户端去操作NameNode的这个方法的协议 this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build(); // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, refreshAuthService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, clientRpcServer); DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, clientRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, clientRpcServer); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, clientRpcServer); // set service-level authorization security policy if (serviceAuthEnabled = conf.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); if (serviceRpcServer != null) { serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); } } // The rpc-server port can be ephemeral... ensure we have the correct info InetSocketAddress listenAddr = clientRpcServer.getListenerAddress(); clientRpcAddress = new InetSocketAddress( rpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServerAddress(conf, clientRpcAddress); minimumDataNodeVersion = conf.get( DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT); // Set terse exception whose stack trace won't be logged this.clientRpcServer.addTerseExceptions(SafeModeException.class, FileNotFoundException.class, HadoopIllegalArgumentException.class, FileAlreadyExistsException.class, InvalidPathException.class, ParentNotDirectoryException.class, UnresolvedLinkException.class, AlreadyBeingCreatedException.class, QuotaExceededException.class, RecoveryInProgressException.class, AccessControlException.class, InvalidToken.class, LeaseExpiredException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, AclException.class, FSLimitException.PathComponentTooLongException.class, FSLimitException.MaxDirectoryItemsExceededException.class, UnresolvedPathException.class); }

    2.10 为什么我们访问NameNode他负责处理请求呢?

    我们去看看NameNode的变量

    protected boolean stopRequested = false; /** Registration information of this name-node */ protected NamenodeRegistration nodeRegistration; /** Activated plug-ins. */ private List<ServicePlugin> plugins; /** 我们的RPC Server其实是NameNode中的一个属性*/ private NameNodeRpcServer rpcServer;

    这个时候我们是不是就可以认为我们NameNode就是RPC的服务端

    三、总结

    首先创建的是httpserverhttpserver上绑定了很多servlet,绑定的serverlet越多功能越强大浏览器通过50070端口去操作HDFS其实就是调用的httpserver中的方法启动NameNodeRpcServer,这个其实就是NameNode对外进行服务的RPC服务端NameNodeRpcServer中包含了两个服务一个是serviceRpcServer和clientRpcServerserviceRpcServer负责处理DataNodeclientRpcServer负责处理客户端
    Processed: 0.014, SQL: 9