1、理论
mapreduce 定义了一个编程模型,由 map 函数进行输入处理,map 函数处理完毕后产生中间文件。中间文件又作为 reduce 的输入,reduce 函数输入后进行处理,输出最终的结果。
mapreduce 能做什么,比如最简单的 wordcount。假设输入有一批大规模的文件,大到单机根本处理不了,我们要计算这批文件中每个单词出现的次数,那么我们就可以利用 mapreduce。mapper 函数的输入是一堆文件(假设文件存储在分布式文件系统,例如 GFS 上),输出是 <key,value> 格式,key 是每个单词,value 是频数 "1"。那么最后每个 map 函数的输出类似于这样:
police 1
suffered 1
police 1
Young 1
Sinks 1
Young 1
执行完毕 mapper 函数后,结果会发给 reduce worker,reduce worker 会将相同的 key 的 value 合并到一起,类似于这样:
police [1,1]
suffered [1]
Young [1,1]
Sinks [1]
我这里是用逗号隔开了,实际上是会将 value 合成 list。
reduce 函数接收输入,将结果相同 key 到结果累加,然后输出到本地文件或者 GFS 上。如果后续没有其他程序,那么 mapreduce 框架会将最终结果输出,有的话可以当作其他 mapreduce 输入。
整个过程用公式表示为(注意 reduce 的 value 是 list,因为 reduce worker 在执行 reduce 函数之前自动将 key 相同的合并):
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
map 调用可以分布在不同机器上,因为可以将输入文件分割成不同的部分发往不同 map 的机器。使用分区函数将 Map 调用产生的中间 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 也可以分布到多台机器上执行。分区数量(R)和分区函数由用户来指定(要执行几个 reduce work R 就等于几,为了负载均衡用的)
当用户调用 MapReduce 函数时,将发生下面的一系列动作:
- (1)用户程序首先调用的 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
- (2)这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
- (3)被分配 map 任务的 work 读取输入的数据,解析数据中的 <kev,value> 键值对(输入数据的 key 我们不关心,只关注 value)并传递给 map 函数。map 函数执行用户编写的逻辑产生 <key,value> 中间数据,并缓存在内存中。
- (4)缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给Reduce worker。
- (5)当 Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
- (6)Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。
- (7)当所有的 Map 和 Reduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对MapReduce 调用才返回。
关于上面步骤5之前,可以用 combiner 函数现将相同重复的 key 先合并一下,就避免发送大量的 <key, 1> 之类的情况,猜合在 map worker 本地执行。
关于步骤5,论文上说相同 key 合并在 reduce worker 上。
2、设计
状态说明
mapreduce 是一个非常典型的 master、slave 模式,针对这种模式,比较常见的情况是 master 维护所有 slave 的信息,然后针对其中存活的 slave 进行任务分配,slave 可以运行分配的任务。但这种情况需要额外维护 slave 的状态,增加代码复杂度。
比较简单的做法是 master 只维护任务的状态,任务的状态有三种:Idle、InProgress、Completed。任务刚创建的时候是 Idle 状态,分配给 slave 是 InPregress,slave 完成任务汇报给 master 后,任务会变成 Completed 状态。而 slave 是 stateless,只负责周期性的向 master 拉取任务即可。
针对整个 mapreduce 流程,我们首先设计两个类,master 与 worker(不叫 slave 是因为我们代码没叫,与代码保持一致)。因为流程图中分为两个阶段,Map 和 Reduce 阶段,所以 master 的状态(亦或者是这组任务的状态)有两个阶段 Map、Reduce 状态。然后两个阶段是串行的,即等到 Map 任务全都结束才进行 Reduce 任务。如果 worker 过多的情况下,有 worker 可能得不到任务,我们就可以让此 worker wait 一段时间再拉取,所以 master 再增加个 Wait 状态。等到 Reduce 阶段结束的时候,master 就进入 Exit 阶段,让 reduce worker 退出(不是必须)。
阶段状态的类为 TaskStateEnum,为了简略,将 master 的阶段与任务的种类合并到一起了:
/**
* @author xushu
* @create 8/10/21 9:06 PM
* @desc 任务类型,即所处阶段
*/
public enum TaskStateEnum {
Map(0),
Reduce(1),
Exit(2),
Wait(3);
private int type;
TaskStateEnum(int type) {
this.type = type;
}
}
接下来要说任务的详细信息了。任务 Task 具有最开始的输入(文件的位置等),任务的状态(TaskStateEnum),每个 Map 输出需要切分的分区(nReduce),task 的 id(TaskNumber),中间结果(Map 的输出,即 intermediates),最后的输出结果(output)。
@Getter
@Setter
@Builder
public class Task implements Serializable {
/**
* 输入文件
*/
private String input;
/**
* task 状态
*/
private TaskStateEnum taskState;
/**
* 切分的 redcue 数量
*/
private int nReduce;
/**
* 任务 id
*/
private int TaskNumber;
/**
* 中间结果文件,即被切分的 reduce 文件
*/
private List<String> intermediates;
/**
* 最后的输出结果
*/
private String output;
}
任务放在 taskQueue 中,但是任务状态的维护放在 taskMetaInfo 中。taskMetaInfo 维护任务的状态,是 Idle、InProgress、Completed;任务的分配时间(startTime),主要用来重发超时的任务;放入 taskQueue 中的 task 的引用,这里有一个 trik,因为 task 被 taskQueue 移除后,taskMetaInfo 记录了 task,超时会重新放进去,这样就不用额外维护一个等待队列。
@Getter
@Setter
@Builder
public class TaskMetaInfo {
/**
* 任务的状态,空闲、流程中、完成
*/
private TaskMetaInfoEnum taskMetaInfoEnum;
/**
* 任务分配时间
*/
private Long startTime;
/**
* 对应的任务,为啥要这样,因为 task 被 taskQueue 移除后,taskMetaInfo 记录了 task,超时会重新放进去
*/
private Task task;
}
1)master 启动
master 启动之后,首先会标明自己现在所处 Map 阶段。接着继续创建 Map 任务,它会将文件位置都封装到 Task 中,然后将 Task 放入 taskQueue 中,然后创建用户处理 processor,主要用来处理 PULL_TASK、TASK_COMPLETED 这两类消息。初始化完毕后,会创建处理 timeout 任务的 task(我这里直接 while 循环了)。
/**
* 处理超时异常
*/
private void catcheTimeOutTask() {
while(true){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock){
if(TaskStateEnum.Exit.equals(this.masterPhase)){
return;
}
// 超时则设为空闲,重新拉取
for (Integer taskIndex : taskMeta.keySet()) {
TaskMetaInfo taskMetaInfo = taskMeta.get(taskIndex);
if(TaskMetaInfoEnum.InProgress.equals(taskMetaInfo.getTaskMetaInfoEnum())
&& (System.currentTimeMillis() - taskMetaInfo.getStartTime()) / 1000 > 10){
// 超时了重新放进去
this.taskQueue.add(taskMetaInfo.getTask());
taskMetaInfo.setTaskMetaInfoEnum(TaskMetaInfoEnum.Idle);
}
}
}
}
}
master 收到 worker PULL_TASK 消息后,会将任务从 taskQueue 取出来,然后发送给 worker。
/**
* 分配任务
* @return
*/
private Response assginTask() {
synchronized (lock){
if(taskQueue.size() > 0){
// 有就发出去
Task task = taskQueue.poll();
//记录启动时间
TaskMetaInfo masterTask = taskMeta.get(task.getTaskNumber());
masterTask.setStartTime(System.currentTimeMillis());
masterTask.setTaskMetaInfoEnum(TaskMetaInfoEnum.InProgress);
return Response.builder()
.result(task)
.build();
}else if(TaskStateEnum.Exit.equals(this.masterPhase)){
return Response.builder()
.result(Task.builder().taskState(TaskStateEnum.Exit).build())
.build();
}else {
// 没有 task 就让 worker 等待
return Response.builder()
.result(Task.builder().taskState(TaskStateEnum.Wait).build())
.build();
}
}
}
2)worker Map 流程
worker 创建完毕后,就进入拉取任务的环节,它会不断向 master 发送拉取任务的消息,然后根据任务的种类做相应的处理:
/**
* 向 master 拉取任务
*/
private void pullTask() {
while (true){
try {
Request request = Request.builder()
.messageType(MessageType.PULL_TASK)
.build();
Response response = (Response) this.rpcClient.invokeSync(this.coordinatorAddress, request, 2000);
Task task = (Task) response.getResult();
switch (task.getTaskState()){
case Map:
handlerAssignMapTask(task);
break;
case Reduce:
handlerAssignReduceTask(task);
break;
case Wait:
Thread.sleep(5000);
break;
case Exit:
System.exit(-1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
第一阶段拉取的是 Map 任务,会读取文件内容到内存,然后调用用户预先写好的 Map 函数。Map 函数主要处理输入的内容,然后输出 <key,value> 格式,例如 <'apple', '1'>。
public static List<KeyValue> mapFunction(List<String> lines) {
List<KeyValue> list = new ArrayList<>();
for (String line : lines) {
StringTokenizer tokenizer = new StringTokenizer(line, ",!' '.;()*--[]:\"\"?#$&!@");
while (tokenizer.hasMoreTokens()){
list.add(KeyValue.builder()
.key(tokenizer.nextToken())
.value("1")
.build());
}
}
return list;
}
输出完毕后,会根据用户指定的 NReduce 将输出切分成 NReduce 份保存到本地。保存完毕后,会将这些中间文件的位置收集,向 mater 发送任务处理完毕的消息。整个流程如下:
/**
* 处理 map 任务
* @return
*/
private void handlerAssignMapTask(Task task) {
try {
String fileName = task.getInput();
File file = new File(fileName);
if(!file.exists()){
throw new Exception("文件不存在");
}
List<String> lines = FileUtils.readLines(file, "UTF-8");
List<KeyValue> keyValues = Wc.mapFunction(lines);
List<List<String>> reduceList = new ArrayList<>();
for(int i = 0; i < task.getNReduce(); i++){
reduceList.add(new ArrayList<>());
}
// 分成 R 份,这边可以执行 combiner 函数先合并好
for (KeyValue keyValue : keyValues) {
String line = keyValue.getKey() + " " + keyValue.getValue();
//切分方式是根据key做hash,分成 R 份
int index = Math.abs(keyValue.getKey().hashCode() % task.getNReduce());
List<String> subList = reduceList.get(index);
subList.add(line);
}
List<String> intermediates = new ArrayList<>();
for (int i = 0; i < reduceList.size(); i++) {
String outPutFileName = "/Users/xushu/MyGithub Project/mit6824/src/main/java/labs/lab1/mapreduce2/file/mr_map_out_taskId" + task.getTaskNumber() + "_intermediate" + i + "_" + this.selfAddress + ".txt";
File outFile = new File(outPutFileName);
FileUtils.writeLines(outFile, reduceList.get(i));
intermediates.add(outPutFileName);
}
task.setIntermediates(intermediates);
// 通知 master 创建结束
taskCompleted(task);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 任务完成通知 master
* @param task
* @throws Exception
*/
private void taskCompleted(Task task) throws Exception {
Request request = Request.builder()
.messageType(MessageType.TASK_COMPLETED)
.obj(task)
.build();
this.rpcClient.invokeWithCallback(this.coordinatorAddress, request, null,3000);
}
3)master 处理任务完成结果
master 收到 worker 的 TASK_COMPLETED 消息后,会判断 Map 阶段有没有结束,结束就进入 Reduce 阶段,没有则只收集中间文件的位置信息。如果 Map 阶段结束,则会创建 Reduce 任务,进行 Reduce 阶段。
/**
* 处理完成的任务
* @param task
*/
private void taskCompleted(Task task) {
synchronized (lock){
TaskMetaInfo taskMetaInfo = this.taskMeta.get(task.getTaskNumber());
// 不是此阶段的任务或者任务已经完成,则忽略
if(!task.getTaskState().equals(this.masterPhase) || TaskMetaInfoEnum.Completed.equals(taskMetaInfo.getTaskMetaInfoEnum())){
return;
}
this.taskMeta.get(task.getTaskNumber()).setTaskMetaInfoEnum(TaskMetaInfoEnum.Completed);
switch (task.getTaskState()){
case Map:
// map 任务的结果以任务 id 分开
for (int i = 0; i < task.getIntermediates().size(); i++) {
List<String> filePath = this.intermediates.getOrDefault(i, new ArrayList<>());
filePath.add(task.getIntermediates().get(i));
this.intermediates.put(i, filePath);
}
// 所有任务完成才进入 reduce 阶段
if(allTaskDone()){
createReduceTask();
this.masterPhase = TaskStateEnum.Reduce;
}
break;
case Reduce:
// reduce 所有任务完成就退出
if(allTaskDone()){
this.masterPhase = TaskStateEnum.Exit;
}
break;
}
}
}
整个流程与论文描述的流程基本类似,就是一个很经典的 master、slave 模式,mater 维护任务的元数据信息,负责分发任务;slave 负责处理任务来汇报任务结果。
当然,为了简单其实可以根据 springboot + 数据库的模式,springboot 负责写 master、slave 应用,数据库负责记录任务信息,这样会大大简化工作量,且更易于在生产中应用。
还有,关于重复的任务怎么办?直接抛弃后面一个,因为master代码中关于任务完成的代码中有这样一段处理,如果不是此阶段的任务或者任务已经完成,则直接结束。
3、参考资料
mapreduce 论文
https://github.com/s09g/mapreduce-go