简介
与MRv1相比Yarn的设计要复杂得多。YARN借用了MRv1的一些底层基础库(如RPC库),又引入了很多新的设计方式,它的基础库更多,如:google的开源序列化框架Protocol Buffers、Apache Avro、自定义的服务库、事件库和状态机等。
Yarn基础库是其它Yarn模块的基础,它的设计直接决定了YARN的稳定性和扩展性,概括起来,YARN的基础库主要有以下几个:
- Protocol Buffers
- Apache Arvo
- RPC
- 服务库和时间库
- 状态机
接下来我们将一一介绍。
Protocol Buffers
简介
是Google的开源的序列化库,常有做数据存储,消息通信具有以下优点:
- 跨平台
- 高新能,体积小
- 使用简单
- 兼容性好
Protocol Buffers示例
PersonProtos.proto
option java_package = "com.thougtworks.demo";
option java_outer_classname = "PersonProtos";
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
message PhoneNumber {
required string number = 1;
required int32 type = 2;
}
repeated PhoneNumber phone = 4;
}
使用如下命令进行编译
protoc --java_out=. PersonProtos.proto
最终生成如下代码:
Apache Arvo
概念
它的RPC框架,具有平台无关,支持动态模式(无需编译)等优点。
目前只用于yarn日志序列化库中,以及MR所有事件采用Arvo做序列化和反序列化。
示例
user.avro
{
"namespace": "com.thoughtworks.demo.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
使用如下命令进行编译
java -jar avro-tools-1.8.2.jar compile schema user.avro
Hadoop RPC
RPC简介
RPC (Remote Procedure Call) 通过它解决的两个问题:
- 解决分布式系统中,服务之间的调用问题。
- 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
如何实现远程过程调用?
如上图, 当我们使用RPC完成一个远程加法调用是如何处理的的呢?
Client端需要调用了远程的一个Calculator的一个实现类的add方法,这种通过远程调用Server端的RPC接口,来获取运算结果,因此称之为Stub。Stub可以认为它是一个代理;
Stub怎么和Server端建立远程通讯呢?这时候就要用到远程通讯类库,也就是图中的Run-time Library,它的而实现有Java的Socket、Apache Arvo、Apache swift等;
Stub通过调用通讯工具提供的方法,和Server建立起了通讯,然后将请求数据发给Server。需要注意的是,由于底层的网络通讯是基于二进制格式的,因此这里Stub传给Run-time Library的数据也必须是二进制,比如calculator.add(1,2),你必须把参数值1和2放到一个Request对象里头(这个Request对象当然不只这些信息,还包括要调用哪个服务的哪个RPC接口等其他信息),然后序列化为二进制,再传给Run-time Library;
二进制的数据传到Server端,Server端当然也有自己的通讯类库,通过它接收二进制的请求。既然数据是二进制的,那么自然要进行反序列化了,将二进制的数据反序列化为请求对象,然后将这个请求对象交给Server端的Stub处理;
和之前的Client端的Stub一样,这里的Stub也同样是个“假玩意”,它所负责的,只是去解析请求对象,知道调用方要调的是哪个RPC接口,传进来的参数又是什么,然后再把这些参数传给对应的RPC接口,也就是Calculator的实际实现类去执行;
RPC接口执行完毕,返回执行结果,现在轮到Server端要把数据发给Client了。这里是一样的道理,一样的流程,只是相互的位置进行交换。
Hadoop RPC
Hadoop RPC主要对外提供了两个方法:
- public static <T> ProtocolProxy<T> getProtocolProxy得到RPC的客户端。
- public Builder(Configuration conf) 构造服务端对象。
Hadoop RPC主要由RPC、Client、Server三大类组成
1,RPC类分析
RPC类是对client-server网络模型的封装,为程序提供一套更方便更简洁的编程接口。
如上图,getProxy和waitForProxy是构建PRC的客户端, stop为销毁方法,set/getProtocolEngine是用来设置或者获得序列化的实现。
2,Client类分析
Client主要完成发PRC信息并接收结果。
如上图, Client内部有两个非常重要的内部类,Call和Connection。
Call类,包含5个成员变量,分别是:
id:唯一标示,标示调用哪个远程的接口
retry:重试次数
rpcRequest:序列化后请求信息
rpcResponse:序列化后返回值信息
error: 错误信息
done:是否执行完成的标示
rpcKind:RPC序列化的类型,目前支持WritableRpcEngine ProtobufRpcEngineConnection类,Client与每个Server之间的一个连接
包含如下几个主要的变量和方法
remoteId:client与server连接的唯一标示
server:server的endpoint
socket:与Server通信的socket
in:输入流
out:输入流
calls:保存RPC请求的hash table
addCall:将一个请求添加到hash table中
sendParam:发送RPC请求
receiveRespone:接受服务端的RPC请求
run:调用receiveRespone,会一直等待接受PRC返回的结果
Client处理流程
如上图示:
- 1,创建一个Connection对象,并将远程方法调用信息封装成Call,放到HashTable中。
- 2,调用sendRpcRequest方法将当前Call对象发送给Server。
- 3,Server处理预案RPC的请求后,将结果通过网络返回给Client,Client端通过receivePRCRespone函数返回结果。
- 4,Client检查结果(成功or失败),并将对应Call对象冲HashTable中删除。
Server类分析
Reactor模式
Reactor模式中文叫做反应器模式,是用来解决服务器端高性能并发的问题。Netty、Redis都使其解决高并发的问题。
示例代码:
package com.wing.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor());
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//调用之前注册的callback对象
if (r != null) {
r.run();
}
}
// inner class
class Acceptor implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new MthreadHandler(selector, channel);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
package com.wing.test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MthreadHandler implements Runnable
{
final SocketChannel channel;
final SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(3072);
static final int READING = 0, SENDING = 1;
int state = READING;
ExecutorService pool = Executors.newFixedThreadPool(2);
static final int PROCESSING = 3;
MthreadHandler(Selector selector, SocketChannel c) throws IOException
{
channel = c;
c.configureBlocking(false);
// Optionally try first read now
selectionKey = channel.register(selector, 0);
//将Handler作为callback对象
selectionKey.attach(this);
//第二步,注册Read就绪事件
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete()
{
/* ... */
return false;
}
boolean outputIsComplete()
{
/* ... */
return false;
}
void process()
{
/* ... */
return;
}
public void run()
{
try
{
if (state == READING)
{
read();
}
else if (state == SENDING)
{
send();
}
} catch (IOException ex)
{ /* ... */ }
}
synchronized void read() throws IOException
{
// ...
channel.read(input);
if (inputIsComplete())
{
state = PROCESSING;
//使用线程pool异步执行
pool.execute(new Processer());
}
}
void send() throws IOException
{
channel.write(output);
//write完就结束了, 关闭select key
if (outputIsComplete())
{
selectionKey.cancel();
}
}
synchronized void processAndHandOff()
{
process();
state = SENDING;
// or rebind attachment
//process完,开始等待write就绪
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
class Processer implements Runnable
{
public void run()
{
processAndHandOff();
}
}
}
Yarn Server中Reactor模式
- Reactor: I/O事件的派发者。
- Acceptor:接受Client的连接,并指派给一个Handler。
- Handler 与一个Client的通信实体,具体实现业务处理。
- Reader/Sender: 为加速处理,Reactor模式构建一个存放数据的处理线程的线程池。这样数据读出后或者写入前,立即放到线程池中执行。
Server端的处理流程是什么样的呢?如下图
接收请求
接收来自各个客户端的RPC请求,装成Call类,并放到callQueue中,以便进行后续处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。
整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至Listener决定,当前Listener只是采用了简单的轮询分配机制。
Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听(它负责的那部分)客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。处理请求
该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。
Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务返回结果
前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。
Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。
4,在Yarn中如何自定义的一个RPC服务
- 定义一个PRC协议
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
//版本号,默认情况下,不同版本号的RPC Client和Server之间不能相互通信
public static final long versionID = 1L;
String echo(String value) throws IOException;
int add(int v1, int v2) throws IOException;
}
- 实现一个PRC协议
public static class ClientProtocolImpl implements ClientProtocol {
//重载的方法,用于获取自定义的协议版本号,
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
//重载的方法,用于获取协议签名
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
inthashcode) {
return new ProtocolSignature(ClientProtocol.versionID, null);
}
public String echo(String value) throws IOException {
return value;
}
public int add(int v1, int v2) throws IOException {
return v1 + v2;
}
}
- 构造并启动RPC Server
Server server = new RPC.Builder(conf)
.setProtocol(ClientProtocol.class)
.setInstance(new ClientProtocolImpl())
.setBindAddress(ADDRESS)
.setPort(8080)
.setNumHandlers(5).build();
server.start();
- 构造RPC Client并发送RPC请求
proxy = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, addr, conf);
int result = proxy.add(5, 6);
String echoResult = proxy.echo("result");
服务库和事件库
1,服务库
YARN对于生命周期较长的对象,采用了基于服务的对象管理模型对其进行管理,该模型主要有以下几个特点:
- 每个服务对象分为4个状态:
NOTINITED(被创建)
INITED(已初始化)
STARTED(已启动)
STOPPED(已停止) - 任何服务状态变化都可以触发Action。
- 可通过组合的方式对任意服务进行组合,以便进行统一管理。
如上图,所有的服务对象最终均实现了接口Service,它定义了最基本的服务初始化、启动、停止等操作,而AbstractService类提供了一个最基本的Service实现。YARN中所有对象,如果是非组合服务,直接继承AbstractService类即可,否则需继承CompositeService。比如ResourceManager。
事件库
YARN采用了基于事件驱动的并发模型,该模型能够大大增强并发性,从而提高系统整体性能。为了构建该模型,YARN将各种处理逻辑抽象成事件和对应事件调度器,并将每类事件的处理过程分割成多个步骤,用有限状态机表示。
如上图,具体过程:处理请求会作为事件进入系统,由中央异步调度器(AsyncDispatcher)负责传递给相应事件调度器(Event Handler)。该事件调度器可能将该事件转发给另外一个事件调度器,也可能交给一个带有有限状态机的事件处理器,其处理结果也以事件的形式输出给中央异步调度器。而新的事件会再次被中央异步调度器转发给下一个事件调度器,直至处理完成(达到终止条件)。
在YARN中,所有核心服务实际上都是一个中央异步调度器,包括ResourceManager、NodeManager、MRAppMaster等,它们维护了事先注册的事件与事件处理器,并根据接收的事件类型驱动服务的运行。
当使用YARN事件库时,通常先要定义一个中央异步调度器AsyncDispatcher,负责事件的处理与转发,然后根据实际业务需求定义一系列事件Event与事件处理器EventHandler,并注册到中央异步调度器中以实现事件统一管理和调度。
状态机
状态机模式
状态机:由一组状态组成,这些状态分为三类:初始状态、中间状态和最终状态。状态机从初始状态开始运行,经过一系列中间状态后,到达最终状态并退出。在一个状态机中,每个状态都可以接收一组特定事件,并根据具体的事件类型转换到另一个状态。当状态机转换到最终状态时,则退出。
状态机解决的问题:解决复杂状态下,状态流转过程中的可扩展性,可阅读性。
如上图是示:
- Context:环境类
- State:抽象状态类
- ConcreteState:具体状态类
示例:
public class StateMachineTest {
public static void main(String[] args) {
Context context = new ContextImpl();
context.state(ColorState.WHITE);
while (context.state().process(context)) ;
}
}
public interface Context {
State state();
void state(State state);
}
public class ContextImpl implements Context {
State state;
@Override
public State state() {
return state;
}
@Override
public void state(State state) {
this.state = state;
}
}
interface State {
boolean process(Context context);
}
public enum ColorState implements State {
RED {
public boolean process(Context context) {
context.state(ColorState.GREEN);
System.out.println("Current State = " + this);
return true;
}
},
GREEN {
public boolean process(Context context) {
context.state(ColorState.BLACK);
System.out.println("Current State = " + this);
return true;
}
},
BLACK {
public boolean process(Context context) {
context.state(ColorState.YELLOW);
System.out.println("Current State = " + this);
return true;
}
},
YELLOW {
public boolean process(Context context) {
context.state(ColorState.WHITE);
System.out.println("Current State = " + this);
return true;
}
},
WHITE {
public boolean process(Context context) {
context.state(ColorState.BLUE);
System.out.println("Current State = " + this);
return true;
}
},
BLUE {
public boolean process(Context context) {
context.state(ColorState.RED);
System.out.println("Current State = " + this);
return false;
}
};
public abstract boolean process(Context context);
}
Yarn状态机
在Yarn中,每种状态转换由一个四元组表示,分别是转换前状态(preState)、转换后状态(postState)、事件(event)和回调函数(hook)。YARN定义了三种状态转换方式,具体如下:
- 一个初始状态、一个最终状态、一种事件。该方式表示状态机在preState状态下,接收到Event事件后,执行函数状态转移函数Hook,并在执行完成后将当前状态转换为postState。
- 一个初始状态、多个最终状态、一种事件。该方式表示状态机在preState状态下,接收到Event事件后,执行函数状态转移函数Hook,并将当前状态转移为函数Hook的返回值所表示的状态。
- 一个初始状态、一个最终状态、多种事件。该方式表示状态机在preState状态下,接收到Event1、Event2和Event3中的任何一个事件,将执行函数状态转移函数Hook,并在执行完成后将当前状态转换为postState。
Yarn状态机类的具体实现
如上图:YARN对外提供了一个状态机工厂StatemachineFactory,它提供多种addTransition方法供用户添加各种状态转移,一旦状态机添加完毕后,可通过调用installTopology完成一个状态机的构建。