Hadoop-Yarn源码-RPC基础

    技术2022-07-10  98

    一.简介

    一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式程序在内的应用程序更加轻易。 (Hadoop 2.6版本)

    二.RPC通信模型

    RPC通常采用客户机/服务器模型。

    RPC处理过程

    客户程序以本地方式调用系统产生的Stub程序该Stub程序将函数调用信息按照网络通信模块要求封装成消息包,并交给通信模块发送到远程服务端。远程服务端接收到此消息后,将此消息发送给相应的Stub程序。Stub程序拆封消息,形成被调用过程要求的形式,并调用对应函数。被调用函数按照所获参数执行并将结果返回给Stub程序。Stub程序将此结果封装成消息,通过网络通信模块逐级地传送给客户端程序。

    三.Hadoop RPC特点

    RPC实际上是一个分布式计算中C/S(Client/Server模型)的一个应用实例。

    透明性

    高性能

    可控性

    四.RPC总体架构

    Hadoop RPC主要分为四部分,分为序列化层、函数调用层、网络传输层、服务端处理框架。

    序列化层。序列化主要作用就是将结构化对象转换为字节流便于通过传输或写入持久存储,在RPC框架中,它主要作用于将用户请求中参数或者应答转换为字节流以便跨机传输。函数调用,函数调用层主要功能是定位要调用函数并执行该函数,HadoopRPC采用了基于Java反射机制与动态代理实现了函数调用。网络传输,网络传输描述了Client与Server之间消息传输的方式,HadoopRPC采用基于TCP/IP的Socket机制。服务器端处理,服务器端处理框架可抽象为网络I/O模型,它描述了客户端与服务端间信息交互方式,它的涉及直接决定这服务端的并发能力,常见的网络I/O模型有阻塞式I/O、非阻塞I/O、事件驱动I/O等,HadoopRPC采用了基于Reactor设计模式的事件驱动I/O模型。

    Hadoop RPC总体架构自下而上可分为两层

    第一层是一个基于JavaNIO实现的客户机-服务器(C/S)通信模型。其中,客户端将用户的调用方法及其参数封装成请求包发送服务端。服务器端收到请求包后,经解包、调用参数、打包结果等一系列操作后,将结果返回给客户端。为了增强Server端的扩展性和并发能力,Hadoop RPC采用了基于事件驱动的Reactor设计模式,在具体实现时,用到了JDK提供的各种功能包,主要包括java.nio、java.lang.reflect(反射机制和动态代理)、java.net(网络编程)等。第二层是供更上层程序直接调用的RPC接口,这些接口底层即为C/S通信模型。

    五.Hadoop RPC使用方法

    Hadoop RPC对外主要提供两种接口(org.apache.hadoop.ipc.RPC),分别是:

    //构造一个客户端代理对象(实现某个协议),用于向服务器发送RPC请求 public static <T> T getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, SocketFactory factory) throws IOException { return getProtocolProxy( protocol, clientVersion, addr, conf, factory).getProxy(); }

    org.apache.hadoop.ipc.RPC.Builder 静态类,构造RPC Server

    5.1 定义RPC协议

    RPC协议是客户端和服务端之间通信接口,它定义了服务器端对外提供的服务器接口。

    public interface TestClientProtocol extends VersionedProtocol { //版本号,默认情况下,不同版本号Client 和Server之间不能相互通信 public static final long versionID = 1L; String echo(String value) throws IOException; int add(int v1,int v2) throws IOException; }

    5.2 实现协议

    Hadoop RPC协议通常是一个Java接口,用户实现该接口。

    public class TestClientProtocolImpl implements TestClientProtocol { @Override public String echo(String value) throws IOException { return value; } @Override public int add(int v1, int v2) throws IOException { return v1+v2; } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return TestClientProtocol.versionID; } @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { return new ProtocolSignature(TestClientProtocol.versionID,null); } }

    5.3 构造并启动RPC Server

    直接使用静态类Builder构造一个RPC Server,并调用函数start()启动该Server。

    RPC.Server server = new RPC.Builder(conf).setProtocol(TestClientProtocol.class) .setInstance(new TestClientProtocolImpl()).setBindAddress(ADDRESS).setPort(99999) .setNumHandlers(5).build(); server.start();

    5.4 构造RPC Client并发送RPC请求

    使用静态方法getProxy构造客户端代理对象。

    TestClientProtocol proxy = RPC.getProxy(TestClientProtocol.class, TestClientProtocol.versionID, new InetSocketAddress(ADDRESS, 99999), conf); int result = proxy.add(5, 6); String test = proxy.echo("test"); System.out.println(result); System.out.println(test);

    六.Hadoop RPC类

    Hadoop RPC主要由三大类组成,即RPC、Client、Server,分别对应对外编程接口、客户端实现和服务器实现。

    6.1 ipc.RPC

    RPC类实际上是对底层客户机 - 服务器网络模型的封装,以便为程序员提供一套更方便简洁的编程接口。

    RPC类定义了一系列构建和销毁RPC客户端的方法,构建方法分为getProxy和waitForProxy两类,销毁方只有一个,即为stopProxy。RPC服务器的构建则由静态内部类RPC.Builder,该类提供了一些方法共用户设置一些基本的参数,设置完成参数,可调用build()完成一个服务器对象的构建,调用start()方法启动该服务器。

    6.2 ipc.Client

    Client主要完成的功能是发送远程过程调用信息并接收执行结果。

    Client内部有两个重要的内部类,分别Call和Connection。

    Call类

    封装一个RPC请求,它包含5个成员变量。它包含5个成员变量,分别是唯一标识ID、函数调用信息param、函数执行返回值value、出错或者异常信息error和执行完成标识符done。

    private Call(RPC.RpcKind rpcKind, Writable param) { this.rpcKind = rpcKind; this.rpcRequest = param; //callId ThreadLocal final Integer id = callId.get(); if (id == null) { this.id = nextCallId(); } else { callId.set(null); this.id = id; } //retryCount ThreadLocal final Integer rc = retryCount.get(); if (rc == null) { this.retry = 0; } else { this.retry = rc; } }

    由于Hadoop RPC Server采用异步方式处理客户端请求,这使远程过程调用的发生顺序与结果返回顺序无直接关系,而Client端正式提供ID识别不同的函数调用的。当客户端向服务器端发送请求时,只需填充id和param两个变量,而剩下的三个变量则由服务器根据函数执行情况填充。

    Connection类

    Client与每个Server之间维护一个通信连接,与该连接相关的基本信息及操作被封装到Connection类中,基本信息主要包括通信连接唯一标识、与Server端通信的Socket、网络输入数据流(in)、网络输出数据流(out)、保存RPC请求的哈希表(calls)等。

    addCall——将一个Call对象添加到哈希表。

    private synchronized boolean addCall(Call call) { if (shouldCloseConnection.get()) return false; calls.put(call.id, call); notify(); return true; }

    sendParam——向服务端发送RPC请求。

    public void sendRpcRequest(final Call call) //sendParamsExecutor 线程池发送请求

    receiveResponse——从服务器端接收处理已经处理完成的RPC请求。 run——Connection是一个线程类,它会run方法调用receiveResponse方法,会一直等待接收RPC返回结果。

    @Override public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { while (waitForWork()) {//wait here for work - read or close connection receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }

    总体来说Client端实现比较简单,用hashTable的结构来维护connectionId -> connections以及callId -> calls 对应关系,使得请求响应不需要有严格的顺序性。

    HadoopRPC Client处理流程

    /** * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)} * 1.首先创建一个Call对象,封装RPC请求,成员变量有唯一标识id、请求数据、返回数据、是否完成等 * 2、创建Connection对象(它是个线程),并与服务器连接,即Client与Server之间的一个通信连接,保存未完成的Call对象至哈希表,唯一标识ID,Server通信的Socket,网络输入输出流。 * 3.调用connection.sendRpcRequest(call);将Call对象发送给Server * 4.等待Server端处理Call请求,服务端处理完成,通过网络返回服务端处理完成后,通过网络返回给Client端。这部分代码不在call方法里,还记得1中Connection是个线程吗?去run方法看看 线程一直循环,直到Server返回结果,然后调用receiveRpcResponse方法返回数据。 * 5.再次回到call方法,它也有个循环,一直在等待结果返回。结果返回后,检查下成功失败后,就将Call从哈希表中移除了。 * for RPC_BUILTIN */ public Writable call(Writable param, InetSocketAddress address) throws IOException { return call(RPC.RpcKind.RPC_BUILTIN, param, address); }

    6.3 ipc.Server

    Hadoop采用了Master/Slave结构,其中Master是整个系统的单点,这是制约系统性能和可扩展性的最关键因素之一。

    ipc.Server采用了很多提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等。

    Reactor是并发编程中一种基于事件驱动的设计模式

    通过派发/分离IO操作事件提高系统的并发性能。提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。

    ipc.Server实际上实现了一个典型的Reactor设计模式

    Reactor : I/O事件的派发者。Acceptor : 接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler。Handler : 与一个Client通信的实体,并按一定的过程实现业务的处理。Reader/Sender : 为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程吃中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。

    HadoopRPC Server处理流程

    接收请求

    该阶段主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(CallQueue)中,该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。

    整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦由新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至于每个Reader线程负责哪些客户端连接,完全由Listener决定,当前Listener只是采用了简单的轮询分配机制。

    /** Listens on the socket. Creates jobs for the handler threads * 只有一个Listener线程,统一服装监听一个来自客户端连接请求,一旦有新的求到达,它会采用轮询的方式从线程池中选贼一个Reader线程进行处理 * 而Reader线程可同时存在多个,他们分别负责接收一部分客户端连接的RPC请求 * * */ private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at private int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); }

    Listener和Reader线程内部各自包含一个Selector对象,分别用于监SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列中。

    处理请求

    该阶段主要任务是从共享队列中获取call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。

    Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度很慢,可能难以将结果一次性发送给客户端,此时Handler将尝试着将后续发送任务交给Responder线程

    private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true); this.setName("IPC Server handler "+ instanceNumber + " on " + port); } @Override public void run() { LOG.debug(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here ......

    返回结果

    Server仅存一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。

    /** * 函数调用返回结果过大或者网络异常情况(网络过慢), * 仅存一个selector对象,用于监听SelectionKey.OP_WRITE * */ private class Responder extends Thread { private final Selector writeSelector; private int pending; // connections waiting to register final static int PURGE_INTERVAL = 900000; // 15mins Responder() throws IOException { this.setName("IPC Server Responder"); this.setDaemon(true); writeSelector = Selector.open(); // create a selector pending = 0; } @Override public void run() { LOG.info(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); try { doRunLoop(); } finally { LOG.info("Stopping " + Thread.currentThread().getName()); try { writeSelector.close(); } catch (IOException ioe) { LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe); } } }

    七. Hadoop RPC参数调优

    Reader线程数目。由参数ipc.server.read.threadpool.size配置,默认1,默认情况下,一个RPC Server 只包含一个Reader线程。

    每个Handler线程对应最大Call数目。由参数ipc.server.handler.queue.size指定,默认100,默认情况下灭个Handler线程对应的Call队列长度100。

    Handler线程数目。在Hadoop中,ResourceManager和NameNode分别是Yarn和HDFS两个子系统中的RPC Server ,其对应Handler数目分别为参数yarn.resourcemananger.resource-tracker.client.thread-count 和 dfs.namenode.service.handler.count 指定。默认分别为50和10,当集群规模较大时,这两个参数值会大大影响性能。

    客户端最大重试次数。ipc.client.connect.max.retries指定,默认10。

    文献

    《Hadoop技术内幕 深入解析YARN架构设计与实现原理》

    Processed: 0.036, SQL: 9