lab1:MapReduce

1、理论

mapreduce 定义了一个编程模型,由 map 函数进行输入处理,map 函数处理完毕后产生中间文件。中间文件又作为 reduce 的输入,reduce 函数输入后进行处理,输出最终的结果。

mapreduce 能做什么,比如最简单的 wordcount。假设输入有一批大规模的文件,大到单机根本处理不了,我们要计算这批文件中每个单词出现的次数,那么我们就可以利用 mapreduce。mapper 函数的输入是一堆文件(假设文件存储在分布式文件系统,例如 GFS 上),输出是 <key,value> 格式,key 是每个单词,value 是频数 "1"。那么最后每个 map 函数的输出类似于这样:

police 1
suffered 1
police 1
Young 1
Sinks 1
Young 1

执行完毕 mapper 函数后,结果会发给 reduce worker,reduce worker 会将相同的 key 的 value 合并到一起,类似于这样:

police [1,1]
suffered [1]
Young [1,1]
Sinks [1]

我这里是用逗号隔开了,实际上是会将 value 合成 list。

reduce 函数接收输入,将结果相同 key 到结果累加,然后输出到本地文件或者 GFS 上。如果后续没有其他程序,那么 mapreduce 框架会将最终结果输出,有的话可以当作其他 mapreduce 输入。

整个过程用公式表示为(注意 reduce 的 value 是 list,因为 reduce worker 在执行 reduce 函数之前自动将 key 相同的合并):

map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)

map 调用可以分布在不同机器上,因为可以将输入文件分割成不同的部分发往不同 map 的机器。使用分区函数将 Map 调用产生的中间 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 也可以分布到多台机器上执行。分区数量(R)和分区函数由用户来指定(要执行几个 reduce work R 就等于几,为了负载均衡用的)


mapreduce 流程

当用户调用 MapReduce 函数时,将发生下面的一系列动作:

  • (1)用户程序首先调用的 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
  • (2)这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
  • (3)被分配 map 任务的 work 读取输入的数据,解析数据中的 <kev,value> 键值对(输入数据的 key 我们不关心,只关注 value)并传递给 map 函数。map 函数执行用户编写的逻辑产生 <key,value> 中间数据,并缓存在内存中。
  • (4)缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给Reduce worker。
  • (5)当 Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
  • (6)Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。
  • (7)当所有的 Map 和 Reduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对MapReduce 调用才返回。

关于上面步骤5之前,可以用 combiner 函数现将相同重复的 key 先合并一下,就避免发送大量的 <key, 1> 之类的情况,猜合在 map worker 本地执行。

关于步骤5,论文上说相同 key 合并在 reduce worker 上。

2、设计

状态说明

mapreduce 是一个非常典型的 master、slave 模式,针对这种模式,比较常见的情况是 master 维护所有 slave 的信息,然后针对其中存活的 slave 进行任务分配,slave 可以运行分配的任务。但这种情况需要额外维护 slave 的状态,增加代码复杂度。

比较简单的做法是 master 只维护任务的状态,任务的状态有三种:Idle、InProgress、Completed。任务刚创建的时候是 Idle 状态,分配给 slave 是 InPregress,slave 完成任务汇报给 master 后,任务会变成 Completed 状态。而 slave 是 stateless,只负责周期性的向 master 拉取任务即可。

针对整个 mapreduce 流程,我们首先设计两个类,master 与 worker(不叫 slave 是因为我们代码没叫,与代码保持一致)。因为流程图中分为两个阶段,Map 和 Reduce 阶段,所以 master 的状态(亦或者是这组任务的状态)有两个阶段 Map、Reduce 状态。然后两个阶段是串行的,即等到 Map 任务全都结束才进行 Reduce 任务。如果 worker 过多的情况下,有 worker 可能得不到任务,我们就可以让此 worker wait 一段时间再拉取,所以 master 再增加个 Wait 状态。等到 Reduce 阶段结束的时候,master 就进入 Exit 阶段,让 reduce worker 退出(不是必须)。

阶段状态的类为 TaskStateEnum,为了简略,将 master 的阶段与任务的种类合并到一起了:

/**
 * @author xushu
 * @create 8/10/21 9:06 PM
 * @desc 任务类型,即所处阶段
 */
public enum TaskStateEnum {
    Map(0),
    Reduce(1),
    Exit(2),
    Wait(3);


    private int type;

    TaskStateEnum(int type) {
        this.type = type;
    }
}

接下来要说任务的详细信息了。任务 Task 具有最开始的输入(文件的位置等),任务的状态(TaskStateEnum),每个 Map 输出需要切分的分区(nReduce),task 的 id(TaskNumber),中间结果(Map 的输出,即 intermediates),最后的输出结果(output)。

@Getter
@Setter
@Builder
public class Task implements Serializable {

    /**
     * 输入文件
     */
    private String input;

    /**
     * task 状态
     */
    private TaskStateEnum taskState;

    /**
     * 切分的  redcue 数量
     */
    private int nReduce;

    /**
     * 任务 id
     */
    private int TaskNumber;

    /**
     * 中间结果文件,即被切分的 reduce 文件
     */
    private List<String> intermediates;

    /**
     * 最后的输出结果
     */
    private String output;

}

任务放在 taskQueue 中,但是任务状态的维护放在 taskMetaInfo 中。taskMetaInfo 维护任务的状态,是 Idle、InProgress、Completed;任务的分配时间(startTime),主要用来重发超时的任务;放入 taskQueue 中的 task 的引用,这里有一个 trik,因为 task 被 taskQueue 移除后,taskMetaInfo 记录了 task,超时会重新放进去,这样就不用额外维护一个等待队列。

@Getter
@Setter
@Builder
public class TaskMetaInfo {

    /**
     * 任务的状态,空闲、流程中、完成
     */
    private TaskMetaInfoEnum taskMetaInfoEnum;

    /**
     * 任务分配时间
     */
    private Long startTime;

    /**
     * 对应的任务,为啥要这样,因为 task 被 taskQueue 移除后,taskMetaInfo 记录了 task,超时会重新放进去
     */
    private Task task;
}
1)master 启动

master 启动之后,首先会标明自己现在所处 Map 阶段。接着继续创建 Map 任务,它会将文件位置都封装到 Task 中,然后将 Task 放入 taskQueue 中,然后创建用户处理 processor,主要用来处理 PULL_TASK、TASK_COMPLETED 这两类消息。初始化完毕后,会创建处理 timeout 任务的 task(我这里直接 while 循环了)。

 /**
     * 处理超时异常
     */
    private void catcheTimeOutTask() {
        while(true){
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            synchronized (lock){
                if(TaskStateEnum.Exit.equals(this.masterPhase)){
                    return;
                }

                // 超时则设为空闲,重新拉取
                for (Integer taskIndex : taskMeta.keySet()) {
                    TaskMetaInfo taskMetaInfo = taskMeta.get(taskIndex);
                    if(TaskMetaInfoEnum.InProgress.equals(taskMetaInfo.getTaskMetaInfoEnum())
                            && (System.currentTimeMillis() - taskMetaInfo.getStartTime()) / 1000 > 10){

                        // 超时了重新放进去
                        this.taskQueue.add(taskMetaInfo.getTask());
                        taskMetaInfo.setTaskMetaInfoEnum(TaskMetaInfoEnum.Idle);
                    }
                }
            }
        }
    }

master 收到 worker PULL_TASK 消息后,会将任务从 taskQueue 取出来,然后发送给 worker。

/**
     * 分配任务
     * @return
     */
    private Response assginTask() {

        synchronized (lock){
            if(taskQueue.size() > 0){
                // 有就发出去
                Task task = taskQueue.poll();
                //记录启动时间
                TaskMetaInfo masterTask = taskMeta.get(task.getTaskNumber());
                masterTask.setStartTime(System.currentTimeMillis());
                masterTask.setTaskMetaInfoEnum(TaskMetaInfoEnum.InProgress);

                return Response.builder()
                        .result(task)
                        .build();
            }else if(TaskStateEnum.Exit.equals(this.masterPhase)){
                return Response.builder()
                        .result(Task.builder().taskState(TaskStateEnum.Exit).build())
                        .build();
            }else {
                // 没有 task 就让 worker 等待
                return Response.builder()
                        .result(Task.builder().taskState(TaskStateEnum.Wait).build())
                        .build();
            }
        }
    }
2)worker Map 流程

worker 创建完毕后,就进入拉取任务的环节,它会不断向 master 发送拉取任务的消息,然后根据任务的种类做相应的处理:

 /**
     * 向 master 拉取任务
     */
    private void pullTask() {

        while (true){
            try {
                Request request = Request.builder()
                        .messageType(MessageType.PULL_TASK)
                        .build();
                Response response = (Response) this.rpcClient.invokeSync(this.coordinatorAddress, request, 2000);
                Task task = (Task) response.getResult();
                switch (task.getTaskState()){
                    case Map:
                        handlerAssignMapTask(task);
                        break;
                    case Reduce:
                        handlerAssignReduceTask(task);
                        break;
                    case Wait:
                        Thread.sleep(5000);
                        break;
                    case Exit:
                        System.exit(-1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

第一阶段拉取的是 Map 任务,会读取文件内容到内存,然后调用用户预先写好的 Map 函数。Map 函数主要处理输入的内容,然后输出 <key,value> 格式,例如 <'apple', '1'>。

public static List<KeyValue> mapFunction(List<String> lines)  {
        List<KeyValue> list = new ArrayList<>();
        for (String line : lines) {
            StringTokenizer tokenizer = new StringTokenizer(line, ",!' '.;()*--[]:\"\"?#$&!@");
            while (tokenizer.hasMoreTokens()){
                list.add(KeyValue.builder()
                        .key(tokenizer.nextToken())
                        .value("1")
                        .build());
            }
        }

        return list;
    }

输出完毕后,会根据用户指定的 NReduce 将输出切分成 NReduce 份保存到本地。保存完毕后,会将这些中间文件的位置收集,向 mater 发送任务处理完毕的消息。整个流程如下:

/**
     * 处理 map 任务
     * @return
     */
    private void handlerAssignMapTask(Task task)  {
        try {
            String fileName = task.getInput();
            File file = new File(fileName);
            if(!file.exists()){
                throw new Exception("文件不存在");
            }
            List<String> lines = FileUtils.readLines(file, "UTF-8");
            List<KeyValue> keyValues = Wc.mapFunction(lines);

            List<List<String>> reduceList = new ArrayList<>();
            for(int i = 0; i < task.getNReduce(); i++){
                reduceList.add(new ArrayList<>());
            }

            // 分成 R 份,这边可以执行 combiner 函数先合并好
            for (KeyValue keyValue : keyValues) {
                String line = keyValue.getKey() + " " + keyValue.getValue();
                //切分方式是根据key做hash,分成 R 份
                int index = Math.abs(keyValue.getKey().hashCode() % task.getNReduce());
                List<String> subList = reduceList.get(index);
                subList.add(line);
            }

            List<String> intermediates = new ArrayList<>();
            for (int i = 0; i < reduceList.size(); i++) {
                String outPutFileName = "/Users/xushu/MyGithub Project/mit6824/src/main/java/labs/lab1/mapreduce2/file/mr_map_out_taskId" + task.getTaskNumber() + "_intermediate" + i + "_" + this.selfAddress + ".txt";
                File outFile = new File(outPutFileName);
                FileUtils.writeLines(outFile, reduceList.get(i));

                intermediates.add(outPutFileName);
            }

            task.setIntermediates(intermediates);

            // 通知 master 创建结束
            taskCompleted(task);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


/**
     * 任务完成通知 master
     * @param task
     * @throws Exception
     */
    private void taskCompleted(Task task) throws Exception {
        Request request = Request.builder()
                .messageType(MessageType.TASK_COMPLETED)
                .obj(task)
                .build();

        this.rpcClient.invokeWithCallback(this.coordinatorAddress, request, null,3000);
    }
3)master 处理任务完成结果

master 收到 worker 的 TASK_COMPLETED 消息后,会判断 Map 阶段有没有结束,结束就进入 Reduce 阶段,没有则只收集中间文件的位置信息。如果 Map 阶段结束,则会创建 Reduce 任务,进行 Reduce 阶段。

 /**
     * 处理完成的任务
     * @param task
     */
    private void taskCompleted(Task task) {
        synchronized (lock){
            TaskMetaInfo taskMetaInfo = this.taskMeta.get(task.getTaskNumber());
            // 不是此阶段的任务或者任务已经完成,则忽略
            if(!task.getTaskState().equals(this.masterPhase) || TaskMetaInfoEnum.Completed.equals(taskMetaInfo.getTaskMetaInfoEnum())){
                return;
            }

            this.taskMeta.get(task.getTaskNumber()).setTaskMetaInfoEnum(TaskMetaInfoEnum.Completed);
            switch (task.getTaskState()){
                case Map:
                    // map 任务的结果以任务 id 分开
                    for (int i = 0; i < task.getIntermediates().size(); i++) {
                        List<String> filePath = this.intermediates.getOrDefault(i, new ArrayList<>());
                        filePath.add(task.getIntermediates().get(i));
                        this.intermediates.put(i, filePath);
                    }

                    // 所有任务完成才进入 reduce 阶段
                    if(allTaskDone()){
                        createReduceTask();
                        this.masterPhase = TaskStateEnum.Reduce;
                    }
                    break;
                case Reduce:
                    // reduce 所有任务完成就退出
                    if(allTaskDone()){
                        this.masterPhase = TaskStateEnum.Exit;
                    }
                    break;
            }
        }
    }

整个流程与论文描述的流程基本类似,就是一个很经典的 master、slave 模式,mater 维护任务的元数据信息,负责分发任务;slave 负责处理任务来汇报任务结果。

当然,为了简单其实可以根据 springboot + 数据库的模式,springboot 负责写 master、slave 应用,数据库负责记录任务信息,这样会大大简化工作量,且更易于在生产中应用。

还有,关于重复的任务怎么办?直接抛弃后面一个,因为master代码中关于任务完成的代码中有这样一段处理,如果不是此阶段的任务或者任务已经完成,则直接结束。

3、参考资料

mapreduce 论文
https://github.com/s09g/mapreduce-go

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,914评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,935评论 2 383
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,531评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,309评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,381评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,730评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,882评论 3 404
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,643评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,095评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,448评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,566评论 1 339
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,253评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,829评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,715评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,945评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,248评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,440评论 2 348

推荐阅读更多精彩内容