一、环境准备以及源码导入
通过我们上一个篇章的学习,我们已经可以简单的知道我们HDFS是如何远程通信的,那么我们接下来就可以进入阅读源码了
我们采用的是我们的场景驱动的方式所以我们接下来,只看NameNode的启动流程,这时我们应该从哪里入手呢,阅读源码我们需要找到程序的入口是吧,那么这个时候,NameNode的入口是什么呢,上一篇博客已经讲解了我们的Hadoop RPC 那么,我们是否还记得,在我们安装NameNode的时候肯定会使用JPS来看看我们的NameNode进程是否启动起来,这个时候我们是否可以猜测一下,我们NameNode其实就是我们的服务端,那么接下来我们就要验证一下我们的NameNode是不是我们的服务端,这个过程是不是就是我们NameNode的启动流程,接下来我们进行阅读源码
1.1 环境准备
版本选择,我选择的是hadoop2.7下载源码
**
导入IDEA
二、阅读源码
2.1 NameNode类的注释
我们阅读源码的时候一定要阅读重要类的注释
我们来翻译一下
NameNode服务即管理了HDFS集群的元数据和 "inode table" 一个HDFS集群里面只能有一个NameNode(除了HA 和联邦机制)
NameNode管理了两张比较重要的表:
1) 一张表管理了文件与block之间的关系
2) 另一张表管理了block文件块与DatabNode主机之间的关系
第一张表非常珍贵,存储到了磁盘上面(因为文件与block块之间的关系是不会发生变化的,属于静态数据)
第二张表示NameNode重启的时候重新构建出来的
NameNode服务是由三个重要的类来进行支撑的:
1) NameNode类:
管理配置的参数
2) NameNode server:
IPC Server:
NameNodeRpcServer:开放端口,等待别人调用.比如:8020/9000
HTTP Server:
nameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况
3) FSNameSystem:
这个类非常重要,管理了HDFS的元数据
2.2 NameNode的main方法
public static void main(String argv
[]) throws Exception
{
if (DFSUtil
.parseHelpArgument(argv
, NameNode
.USAGE
, System
.out
, true)) {
System
.exit(0);
}
try {
StringUtils
.startupShutdownMessage(NameNode
.class, argv
, LOG
);
NameNode namenode
= createNameNode(argv
, null
);
if (namenode
!= null
) {
namenode
.join();
}
} catch (Throwable e
) {
LOG
.error("Failed to start namenode.", e
);
terminate(1, e
);
}
}
2.3 NameNode的createNameNode方法
public static NameNode
createNameNode(String argv
[], Configuration conf
)
throws IOException
{
LOG
.info("createNameNode " + Arrays
.asList(argv
));
if (conf
== null
)
conf
= new HdfsConfiguration();
StartupOption startOpt
= parseArguments(argv
);
if (startOpt
== null
) {
printUsage(System
.err
);
return null
;
}
setStartupOption(conf
, startOpt
);
switch (startOpt
) {
case FORMAT
: {
boolean aborted
= format(conf
, startOpt
.getForceFormat(),
startOpt
.getInteractiveFormat());
terminate(aborted
? 1 : 0);
return null
;
}
case GENCLUSTERID
: {
System
.err
.println("Generating new cluster id:");
System
.out
.println(NNStorage
.newClusterID());
terminate(0);
return null
;
}
case FINALIZE
: {
System
.err
.println("Use of the argument '" + StartupOption
.FINALIZE
+
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'");
terminate(1);
return null
;
}
case ROLLBACK
: {
boolean aborted
= doRollback(conf
, true);
terminate(aborted
? 1 : 0);
return null
;
}
case BOOTSTRAPSTANDBY
: {
String toolArgs
[] = Arrays
.copyOfRange(argv
, 1, argv
.length
);
int rc
= BootstrapStandby
.run(toolArgs
, conf
);
terminate(rc
);
return null
;
}
case INITIALIZESHAREDEDITS
: {
boolean aborted
= initializeSharedEdits(conf
,
startOpt
.getForceFormat(),
startOpt
.getInteractiveFormat());
terminate(aborted
? 1 : 0);
return null
;
}
case BACKUP
:
case CHECKPOINT
: {
NamenodeRole role
= startOpt
.toNodeRole();
DefaultMetricsSystem
.initialize(role
.toString().replace(" ", ""));
return new BackupNode(conf
, role
);
}
case RECOVER
: {
NameNode
.doRecovery(startOpt
, conf
);
return null
;
}
case METADATAVERSION
: {
printMetadataVersion(conf
);
terminate(0);
return null
;
}
case UPGRADEONLY
: {
DefaultMetricsSystem
.initialize("NameNode");
new NameNode(conf
);
terminate(0);
return null
;
}
default: {
DefaultMetricsSystem
.initialize("NameNode");
return new NameNode(conf
);
}
}
}
2.4 NameNode的构造方法
protected NameNode(Configuration conf
, NamenodeRole role
)
throws IOException
{
this.conf
= conf
;
this.role
= role
;
setClientNamenodeAddress(conf
);
String nsId
= getNameServiceId(conf
);
String namenodeId
= HAUtil
.getNameNodeId(conf
, nsId
);
this.haEnabled
= HAUtil
.isHAEnabled(conf
, nsId
);
state
= createHAState(getStartupOption(conf
));
this.allowStaleStandbyReads
= HAUtil
.shouldAllowStandbyReads(conf
);
this.haContext
= createHAContext();
try {
initializeGenericKeys(conf
, nsId
, namenodeId
);
initialize(conf
);
try {
haContext
.writeLock();
state
.prepareToEnterState(haContext
);
state
.enterState(haContext
);
} finally {
haContext
.writeUnlock();
}
} catch (IOException e
) {
this.stop();
throw e
;
} catch (HadoopIllegalArgumentException e
) {
this.stop();
throw e
;
}
this.started
.set(true);
}
2.5 NameNode构造器中调用的initialize方法
protected void initialize(Configuration conf
) throws IOException
{
...
if (NamenodeRole
.NAMENODE
== role
) {
startHttpServer(conf
);
}
loadNamesystem(conf
);
rpcServer
= createRpcServer(conf
);
....
}
2.6 NameNode中startHttpServer中的调用start方法
private void startHttpServer(final Configuration conf
) throws IOException
{
httpServer
= new NameNodeHttpServer(conf
, this, getHttpServerBindAddress(conf
));
httpServer
.start();
httpServer
.setStartupProgress(startupProgress
);
}
void start() throws IOException
{
HttpConfig
.Policy policy
= DFSUtil
.getHttpPolicy(conf
);
final String infoHost
= bindAddress
.getHostName();
final InetSocketAddress httpAddr
= bindAddress
;
final String httpsAddrString
= conf
.getTrimmed(
DFSConfigKeys
.DFS_NAMENODE_HTTPS_ADDRESS_KEY
,
DFSConfigKeys
.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT
);
InetSocketAddress httpsAddr
= NetUtils
.createSocketAddr(httpsAddrString
);
if (httpsAddr
!= null
) {
final String bindHost
=
conf
.getTrimmed(DFSConfigKeys
.DFS_NAMENODE_HTTPS_BIND_HOST_KEY
);
if (bindHost
!= null
&& !bindHost
.isEmpty()) {
httpsAddr
= new InetSocketAddress(bindHost
, httpsAddr
.getPort());
}
}
HttpServer2
.Builder builder
= DFSUtil
.httpServerTemplateForNNAndJN(conf
,
httpAddr
, httpsAddr
, "hdfs",
DFSConfigKeys
.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY
,
DFSConfigKeys
.DFS_NAMENODE_KEYTAB_FILE_KEY
);
httpServer
= builder
.build();
if (policy
.isHttpsEnabled()) {
InetSocketAddress datanodeSslPort
= NetUtils
.createSocketAddr(conf
.getTrimmed(
DFSConfigKeys
.DFS_DATANODE_HTTPS_ADDRESS_KEY
, infoHost
+ ":"
+ DFSConfigKeys
.DFS_DATANODE_HTTPS_DEFAULT_PORT
));
httpServer
.setAttribute(DFSConfigKeys
.DFS_DATANODE_HTTPS_PORT_KEY
,
datanodeSslPort
.getPort());
}
initWebHdfs(conf
);
httpServer
.setAttribute(NAMENODE_ATTRIBUTE_KEY
, nn
);
httpServer
.setAttribute(JspHelper
.CURRENT_CONF
, conf
);
setupServlets(httpServer
, conf
);
httpServer
.start();
int connIdx
= 0;
if (policy
.isHttpEnabled()) {
httpAddress
= httpServer
.getConnectorAddress(connIdx
++);
conf
.set(DFSConfigKeys
.DFS_NAMENODE_HTTP_ADDRESS_KEY
,
NetUtils
.getHostPortString(httpAddress
));
}
if (policy
.isHttpsEnabled()) {
httpsAddress
= httpServer
.getConnectorAddress(connIdx
);
conf
.set(DFSConfigKeys
.DFS_NAMENODE_HTTPS_ADDRESS_KEY
,
NetUtils
.getHostPortString(httpsAddress
));
}
}
2.7 HttpServer 绑定了很多servlet(其实httpserver也是一个rpc)
private static void setupServlets(HttpServer2 httpServer
, Configuration conf
) {
httpServer
.addInternalServlet("startupProgress",
StartupProgressServlet
.PATH_SPEC
, StartupProgressServlet
.class);
httpServer
.addInternalServlet("getDelegationToken",
GetDelegationTokenServlet
.PATH_SPEC
,
GetDelegationTokenServlet
.class, true);
httpServer
.addInternalServlet("renewDelegationToken",
RenewDelegationTokenServlet
.PATH_SPEC
,
RenewDelegationTokenServlet
.class, true);
httpServer
.addInternalServlet("cancelDelegationToken",
CancelDelegationTokenServlet
.PATH_SPEC
,
CancelDelegationTokenServlet
.class, true);
httpServer
.addInternalServlet("fsck", "/fsck", FsckServlet
.class,
true);
httpServer
.addInternalServlet("imagetransfer", ImageServlet
.PATH_SPEC
,
ImageServlet
.class, true);
httpServer
.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet
.class, false);
httpServer
.addInternalServlet("data", "/data/*",
FileDataServlet
.class, false);
httpServer
.addInternalServlet("checksum", "/fileChecksum/*",
FileChecksumServlets
.RedirectServlet
.class, false);
httpServer
.addInternalServlet("contentSummary", "/contentSummary/*",
ContentSummaryServlet
.class, false);
}
这个时候HttpServer2就启动起来了
2.8 NameNode中createRpcServer方法
protected NameNodeRpcServer
createRpcServer(Configuration conf
)
throws IOException
{
return new NameNodeRpcServer(conf
, this);
}
2.9 NameNodeRpcServer的构造方法
public NameNodeRpcServer(Configuration conf
, NameNode nn
)
throws IOException
{
this.nn
= nn
;
this.namesystem
= nn
.getNamesystem();
this.retryCache
= namesystem
.getRetryCache();
this.metrics
= NameNode
.getNameNodeMetrics();
int handlerCount
=
conf
.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY
,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT
);
RPC
.setProtocolEngine(conf
, ClientNamenodeProtocolPB
.class,
ProtobufRpcEngine
.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator
=
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService
= ClientNamenodeProtocol
.
newReflectiveBlockingService(clientProtocolServerTranslator
);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator
=
new DatanodeProtocolServerSideTranslatorPB(this);
BlockingService dnProtoPbService
= DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator
);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator
=
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService NNPbService
= NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator
);
RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator
=
new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
BlockingService refreshAuthService
= RefreshAuthorizationPolicyProtocolService
.newReflectiveBlockingService(refreshAuthPolicyXlator
);
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator
=
new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService refreshUserMappingService
= RefreshUserMappingsProtocolService
.newReflectiveBlockingService(refreshUserMappingXlator
);
RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator
=
new RefreshCallQueueProtocolServerSideTranslatorPB(this);
BlockingService refreshCallQueueService
= RefreshCallQueueProtocolService
.newReflectiveBlockingService(refreshCallQueueXlator
);
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator
=
new GenericRefreshProtocolServerSideTranslatorPB(this);
BlockingService genericRefreshService
= GenericRefreshProtocolService
.newReflectiveBlockingService(genericRefreshXlator
);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator
=
new GetUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService getUserMappingService
= GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator
);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator
=
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService
= HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator
);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator
=
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService
= TraceAdminService
.newReflectiveBlockingService(traceAdminXlator
);
WritableRpcEngine
.ensureInitialized();
InetSocketAddress serviceRpcAddr
= nn
.getServiceRpcServerAddress(conf
);
if (serviceRpcAddr
!= null
) {
String bindHost
= nn
.getServiceRpcServerBindHost(conf
);
if (bindHost
== null
) {
bindHost
= serviceRpcAddr
.getHostName();
}
LOG
.info("Service RPC server is binding to " + bindHost
+ ":" +
serviceRpcAddr
.getPort());
int serviceHandlerCount
=
conf
.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY
,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT
);
this.serviceRpcServer
= new RPC.Builder(conf
)
.setProtocol(
org
.apache
.hadoop
.hdfs
.protocolPB
.ClientNamenodeProtocolPB
.class)
.setInstance(clientNNPbService
)
.setBindAddress(bindHost
)
.setPort(serviceRpcAddr
.getPort()).setNumHandlers(serviceHandlerCount
)
.setVerbose(false)
.setSecretManager(namesystem
.getDelegationTokenSecretManager())
.build();
DFSUtil
.addPBProtocol(conf
, HAServiceProtocolPB
.class, haPbService
,
serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, NamenodeProtocolPB
.class, NNPbService
,
serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, DatanodeProtocolPB
.class, dnProtoPbService
,
serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, RefreshAuthorizationPolicyProtocolPB
.class,
refreshAuthService
, serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, RefreshUserMappingsProtocolPB
.class,
refreshUserMappingService
, serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, RefreshCallQueueProtocolPB
.class,
refreshCallQueueService
, serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, GenericRefreshProtocolPB
.class,
genericRefreshService
, serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, GetUserMappingsProtocolPB
.class,
getUserMappingService
, serviceRpcServer
);
DFSUtil
.addPBProtocol(conf
, TraceAdminProtocolPB
.class,
traceAdminService
, serviceRpcServer
);
InetSocketAddress listenAddr
= serviceRpcServer
.getListenerAddress();
serviceRPCAddress
= new InetSocketAddress(
serviceRpcAddr
.getHostName(), listenAddr
.getPort());
nn
.setRpcServiceServerAddress(conf
, serviceRPCAddress
);
} else {
serviceRpcServer
= null
;
serviceRPCAddress
= null
;
}
InetSocketAddress rpcAddr
= nn
.getRpcServerAddress(conf
);
String bindHost
= nn
.getRpcServerBindHost(conf
);
if (bindHost
== null
) {
bindHost
= rpcAddr
.getHostName();
}
LOG
.info("RPC server is binding to " + bindHost
+ ":" + rpcAddr
.getPort());
this.clientRpcServer
= new RPC.Builder(conf
)
.setProtocol(
org
.apache
.hadoop
.hdfs
.protocolPB
.ClientNamenodeProtocolPB
.class)
.setInstance(clientNNPbService
).setBindAddress(bindHost
)
.setPort(rpcAddr
.getPort()).setNumHandlers(handlerCount
)
.setVerbose(false)
.setSecretManager(namesystem
.getDelegationTokenSecretManager()).build();
DFSUtil
.addPBProtocol(conf
, HAServiceProtocolPB
.class, haPbService
,
clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, NamenodeProtocolPB
.class, NNPbService
,
clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, DatanodeProtocolPB
.class, dnProtoPbService
,
clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, RefreshAuthorizationPolicyProtocolPB
.class,
refreshAuthService
, clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, RefreshUserMappingsProtocolPB
.class,
refreshUserMappingService
, clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, RefreshCallQueueProtocolPB
.class,
refreshCallQueueService
, clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, GenericRefreshProtocolPB
.class,
genericRefreshService
, clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, GetUserMappingsProtocolPB
.class,
getUserMappingService
, clientRpcServer
);
DFSUtil
.addPBProtocol(conf
, TraceAdminProtocolPB
.class,
traceAdminService
, clientRpcServer
);
if (serviceAuthEnabled
=
conf
.getBoolean(
CommonConfigurationKeys
.HADOOP_SECURITY_AUTHORIZATION
, false)) {
clientRpcServer
.refreshServiceAcl(conf
, new HDFSPolicyProvider());
if (serviceRpcServer
!= null
) {
serviceRpcServer
.refreshServiceAcl(conf
, new HDFSPolicyProvider());
}
}
InetSocketAddress listenAddr
= clientRpcServer
.getListenerAddress();
clientRpcAddress
= new InetSocketAddress(
rpcAddr
.getHostName(), listenAddr
.getPort());
nn
.setRpcServerAddress(conf
, clientRpcAddress
);
minimumDataNodeVersion
= conf
.get(
DFSConfigKeys
.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY
,
DFSConfigKeys
.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT
);
this.clientRpcServer
.addTerseExceptions(SafeModeException
.class,
FileNotFoundException
.class,
HadoopIllegalArgumentException
.class,
FileAlreadyExistsException
.class,
InvalidPathException
.class,
ParentNotDirectoryException
.class,
UnresolvedLinkException
.class,
AlreadyBeingCreatedException
.class,
QuotaExceededException
.class,
RecoveryInProgressException
.class,
AccessControlException
.class,
InvalidToken
.class,
LeaseExpiredException
.class,
NSQuotaExceededException
.class,
DSQuotaExceededException
.class,
AclException
.class,
FSLimitException
.PathComponentTooLongException
.class,
FSLimitException
.MaxDirectoryItemsExceededException
.class,
UnresolvedPathException
.class);
}
2.10 为什么我们访问NameNode他负责处理请求呢?
我们去看看NameNode的变量
protected boolean stopRequested
= false;
protected NamenodeRegistration nodeRegistration
;
private List
<ServicePlugin> plugins
;
private NameNodeRpcServer rpcServer
;
这个时候我们是不是就可以认为我们NameNode就是RPC的服务端
三、总结
首先创建的是httpserverhttpserver上绑定了很多servlet,绑定的serverlet越多功能越强大浏览器通过50070端口去操作HDFS其实就是调用的httpserver中的方法启动NameNodeRpcServer,这个其实就是NameNode对外进行服务的RPC服务端NameNodeRpcServer中包含了两个服务一个是serviceRpcServer和clientRpcServerserviceRpcServer负责处理DataNodeclientRpcServer负责处理客户端