MapReduce
MapReduce就是分布式的unix tools, 如果他是在一台机器上跑,那就是unix pipe
Hadoop implementation of MapReduce, that filesystem is called HDFS (Hadoop Distributed File System), an open source reimplementation of Google File System (GFS)
Various other distributed filesystems besides HDFS exist, such as GlusterFS and the Quantcast File System (QFS) [20]. Object storage services such as Amazon S3, Azure Blob Storage, and OpenStack Swift [21]
Job Execution
MapReduce有两个callback function, Mapper, Reducer
Mapper
- The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.
Reducer
- The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records (such as the number of occurrences of the same URL).
Distributed execution of MapReduce
![[DDIA-Figure-10-1.png]]
为了增加locality, 每个mapper function 都直接被copy 到存它要 map 的机器上面,如果mapper 是用 java 写的,直接copy jar file 进那个machine,然后在那个机器上面跑 mapper function (increase locality), 这样直接省去了通过network copy file 的时间
整个过程就是 partitioning mapper function into different machine
reducer 也是一样的,MapReduce framework 用 key 的 hash 来决定分配到哪个 reducer function
每个key value pair 都要进行排序, 由于dataset 过大, 不可能直接在一台机器上sort, 所以排序是在每个机器上先执行一次,并且存到local disk, 然后当mapper 读完所有数据并且写完 sorted output files之后, MapReduce scheduler 通知所有 reducer 他们可以从 mapper 那里读数据了, reducer 拿到所有属于自己的 排好序的 key value之后,进行merge (图中右半边) ,这样就排好序了
最后reducer 执行他要执行的逻辑(分析数据, count, 从大到小排序之类的),然后写入分布式的存储系统
The key-value pairs must be sorted, but the dataset is likely too large to be sorted with a conventional sorting algorithm on a single machine. Instead, the sorting is per‐ formed in stages. First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is written to a sorted file on the mapper’s local disk, using a technique similar to what we discussed in “SSTables and LSM- Trees” on page 76.
Whenever a mapper finishes reading its input file and writing its sorted output files, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper. The reducers connect to each of the mappers and download the files of sorted key-value pairs for their partition. The process of partitioning by reducer, sorting, and copying data partitions from mappers to reducers is known as the shuffle [26] (a confusing term—unlike shuffling a deck of cards, there is no ran‐ domness in MapReduce).
The reduce task takes the files from the mappers and merges them together, preserv‐ ing the sort order. Thus, if different mappers produced records with the same key, they will be adjacent in the merged reducer input.
The reducer is called with a key and an iterator that incrementally scans over all records with the same key (which may in some cases not all fit in memory). The reducer can use arbitrary logic to process these records, and can generate any number of output records. These output records are written to a file on the distributed filesys‐ tem (usually, one copy on the local disk of the machine running the reducer, with replicas on other machines).
一个MapReduce job 本身能力有限,想要做复杂的分析需要多个job, 这就形成了 workflow (多个job 组成的workflow),而且mapreduce 一定要等到之前的job完成才可以执行下一个job, 因为他每次都是写入一个file
Hadoop 本身并没有workflow scheduler, 所有有多个scheduler 被开发出来了
A batch job’s output is only considered valid when the job has completed successfully (MapReduce discards the partial output of a failed job). Therefore, one job in a work‐ flow can only start when the prior jobs—that is, the jobs that produce its input direc‐ tories—have completed successfully. To handle these dependencies between job executions, various workflow schedulers for Hadoop have been developed, including Oozie, Azkaban, Luigi, Airflow, and Pinball [28].
workflow scheduler 还可以帮忙管理jobs, 让公司不同组可以用不同的 mapreduce 的output
These schedulers also have management features that are useful when maintaining a large collection of batch jobs. Workflows consisting of 50 to 100 MapReduce jobs are common when building recommendation systems [29], and in a large organization, many different teams may be running different jobs that read each other’s output. Tool support is important for managing such complex dataflows.
我觉得我要看Ian 用哪个开源的版本,然后直接从这里读一些 guide…… 或者搜一些这些工具的实战教程, 上手用。 这样应该可以节省很多上课需要的时间
Various higher-level tools for Hadoop, such as Pig [30], Hive [31], Cascading [32], Crunch [33], and FlumeJava [34], also set up workflows of multiple MapReduce stages that are automatically wired together appropriately.
在 batch processing 语境下, join means resolving all occurrences of some assciation within a dataset
![[DDIA-Figure-10-2.png]]
就是跟DB join一个意思,把所有user profile 的数据合并,但这样直接每个user 都去DB 查询相关数据的话会直接 overwhelm DB, 所以为了 achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine.
In order to achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine. Making random-access requests over the network for every record you want to process is too slow. Moreover, querying a remote database would mean that the batch job becomes nondeterministic, because the data in the remote database might change.
所以, 更好的解决方案就是用ETL process 来复制一份user database, 然后直接根据 user activity events 的分布来把每个用户的 profile 存到相关 machine 上, 这样就有locality了
Thus, a better approach would be to take a copy of the user database (for example, extracted from a database backup using an ETL process—see “Data Warehousing” on page 91) and to put it in the same distributed filesystem as the log of user activity events. You would then have the user database in one set of files in HDFS and the user activity records in another set of files, and could use MapReduce to bring together all of the relevant records in the same place and process them efficiently.
所以mapper 确保了locality,这样reducer 的 throughput 就提高了
In a sort-merge join, the mappers and the sorting process make sure that all the nec‐ essary data to perform the join operation for a particular user ID is brought together in the same place: a single call to the reducer.
这个角度很好,key 就像一个地址,reducer 是被分配的 destination 的地址
One way of looking at this architecture is that mappers “send messages” to the reduc‐ ers. When a mapper emits a key-value pair, the key acts like the destination address to which the value should be delivered. Even though the key is just an arbitrary string (not an actual network address like an IP address and port number), it behaves like an address: all key-value pairs with the same key will be delivered to the same desti‐ nation (a call to the reducer).
mapper decouples network communication from computation, reducer does the computation
这种方式跟普通DB不一样的地方在于它直接把 索取数据和process 数据的逻辑分开了, 普通的application 通常要先 fetch data, then process (我的movie project就是这样) MapReduce 自己会处理网络问题, 所以application code 不用担心 partial failure
Using the MapReduce programming model has separated the physical network com‐ munication aspects of the computation (getting the data to the right machine) from the application logic (processing the data once you have it). This separation contrasts with the typical use of databases, where a request to fetch data from a database often occurs somewhere deep inside a piece of application code [36]. Since MapReduce handles all network communication, it also shields the application code from having to worry about partial failures, such as the crash of another node: MapReduce trans‐ parently retries failed tasks without affecting the application logic.
其实locality 是一个不断出现的 theme, 无论在内存上, disk 上, 还是这里的分布式处理, locality 永远是优化的首选 (network 的 cache 也一样,都是为了增加locality)
分地区的Datacenter 也是 locality (距离近的Datacenter latency 就低)
GROUP BY
Besides joins, another common use of the “bringing related data to the same place” pattern is grouping records by some key (as in the GROUP BY clause in SQL).
Map-Side joins
如果一个data set 过于大, 所有mapper 都要把output 放到相关 reducer上面, 这时候mapper 可能成为瓶颈, 于是就有了 map-side joins 来优化
map-side joins 就是 mapper 直接做 join 操作
Broadcast hash joins
这个方法就是把一个小的 dataset 直接放到memory, 然后大的dataset 通过直接从小的 data set (in memory) 来map, 大的dataset 是partitioned, 所以每个 mapper 是大的 dataset 其中一部分, 但是小的dataset 很快(in memory) 所以叫 boardcast (整个小dataset 都会 broadcast 给所有 mapper partition)
This simple but effective algorithm is called a broadcast hash join: the word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input (so the small input is effectively “broadcast” to all partitions of the large input), and the word hash reflects its use of a hash table. This join method is supported by Pig (under the name “replicated join”), Hive (“MapJoin”), Cascading, and Crunch. It is also used in data warehouse query engines such as Impala [41].
Partitioned hash joins
如果input的数量 跟 mapper partition 的数量一样,就可以直接按照partition的数量来分配任务, 这就是partitioned hash joins
比如 用户ID结尾只有10种可能(0-9), 那么你分成10个 mapper partition, 然后每个只 process 其中一个,这样读取数据就小了很多
If the partitioning is done correctly, you can be sure that all the records you might want to join are located in the same numbered partition, and so it is sufficient for each mapper to only read one partition from each of the input datasets. This has the advantage that each mapper can load a smaller amount of data into its hash table.
我当时看的时候就觉得这不就是之前的一个job 的结果嘛? 果然,所以这样单独调用mapper 其实是可以为了之后其他 reducer 用,本质上就是在准备各种可以直接查询的 dataset
If a map-side merge join is possible, it probably means that prior MapReduce jobs brought the input datasets into this partitioned and sorted form in the first place. In principle, this join could have been performed in the reduce stage of the prior job. However, it may still be appropriate to perform the merge join in a separate map- only job, for example if the partitioned and sorted datasets are also needed for other purposes besides this particular join.
Batch process 的目的
Build search Indexes
MapReduce 原本就是Google为了建 index 而生的 [[DDIA要读的paper#^6bad9b]] 只不过后来不用了 [[DDIA要读的paper#^721545]]
Lucene/Solr 好像还在用MapReduce 来建 index [[DDIA要读的paper#^5a0230]]
Build machine learning systems
Search indexes are just one example of the possible outputs of a batch processing workflow. Another common use for batch processing is to build machine learning systems such as classifiers (e.g., spam filters, anomaly detection, image recognition) and recommendation systems (e.g., people you may know, products you may be interested in, or related searches [29]).
所以Hadoop 是machine learning的基础设施
MapReduce 这种不会篡改原始数据的模式 就区分了DB, 因为它能够容忍buggy code, 也就是
- Databases with read-write trans‐ actions do not have this property: if you deploy buggy code that writes bad data to the database, then rolling back the code will do nothing to fix the data in the database. (The idea of being able to recover from buggy code has been called human fault tolerance [50].)
书中这一段话很有用,可以反复看 (page 414)
- Like Unix tools, MapReduce jobs separate logic from wiring (configuring the input and output directories), which provides a separation of concerns and ena‐ bles potential reuse of code: one team can focus on implementing a job that does one thing well, while other teams can decide where and when to run that job.
Hadoop居然用Avro?! 那你必须要了解如何用Avro了,或者thrift, y总那里好像有thrift 教程
On Hadoop, some of those low-value syntactic conversions are eliminated by using more structured file formats: Avro (see “Avro” on page 122) and Parquet (see “Column-Oriented Storage” on page 95) are often used, as they provide efficient schema-based encoding and allow evolution of their schemas over time (see Chapter 4).
MapReduce 特点其实是它是general purpose, 可以跑任何request
When the MapReduce paper [1] was published, it was—in some sense—not at all new. All of the processing and parallel join algorithms that we discussed in the last few sections had already been implemented in so-called massively parallel processing (MPP) databases more than a decade previously [3, 40]. For example, the Gamma database machine, Teradata, and Tandem NonStop SQL were pioneers in this area [52].
The biggest difference is that MPP databases focus on parallel execution of analytic SQL queries on a cluster of machines, while the combination of MapReduce and a distributed filesystem [19] provides something much more like a general-purpose operating system that can run arbitrary programs.
所以用HDFS来存储是非常合适的,尤其是需要各种变化的时候(既需要 OLTP, 也需要OLAP 的情况下)
MapReduce整体来说适合大数据
The Hadoop ecosystem includes both random-access OLTP databases such as HBase (see “SSTables and LSM-Trees” on page 76) and MPP-style analytic databases such as Impala [41]. Neither HBase nor Impala uses MapReduce, but both use HDFS for storage. They are very different approaches to accessing and processing data, but they can nevertheless coexist and be integrated in the same system.
Materialization
把mapreduce的中间状态写入一个file就是Materialization
The process of writing out this intermediate state to files is called materialization.
但这种fully materialized intermediate state 有时候没有必要,所以有了data flow engines
Dataflow engines
In order to fix these problems with MapReduce, several new execution engines for distributed batch computations were developed, the most well known of which are Spark [61, 62], Tez [63, 64], and Flink [65, 66].
Since they explicitly model the flow of data through several processing stages, these systems are known as dataflow engines.
这句话很好的总结了dataflow engine与mapreduce的特性
Returning to the Unix analogy, we saw that MapReduce is like writing the output of each command to a temporary file, whereas dataflow engines look much more like Unix pipes.
Graph and iteratice processing
大多数机器学习都适用graph model,也对……page rank 本身就是对各种网页进行排名
Dataflow engines like Spark, Flink, and Tez (see “Materialization of Intermediate State” on page 419) typically arrange the operators in a job as a directed acyclic graph (DAG).
总结
这一章主要在讲 batch processing, 从 unix tool 开始 ( design philosophy)到 mapreduce 和 它后继的 dataflow engine
Some of those design principles are that inputs are immutable, outputs are intended to become the input to another (as yet unknown) program, and complex problems are solved by composing small tools that “do one thing well.”
In the Unix world, the uniform interface that allows one program to be composed with another is files and pipes; in MapReduce, that interface is a distributed filesys‐ tem. We saw that dataflow engines add their own pipe-like data transport mecha‐ nisms to avoid materializing intermediate state to the distributed filesystem, but the initial input and final output of a job is still usually HDFS.
分布式批量处理主要用到了两个方法,
- 拆分
- 容错
The two main problems that distributed batch processing frameworks need to solve are:
Partitioning
In MapReduce, mappers are partitioned according to input file blocks. The out‐ put of mappers is repartitioned, sorted, and merged into a configurable number of reducer partitions. The purpose of this process is to bring all the related data— e.g., all the records with the same key—together in the same place.
Post-MapReduce dataflow engines try to avoid sorting unless it is required, but they otherwise take a broadly similar approach to partitioning.
Fault tolerance
MapReduce frequently writes to disk, which makes it easy to recover from an individual failed task without restarting the entire job but slows down execution in the failure-free case. Dataflow engines perform less materialization of inter‐ mediate state and keep more in memory, which means that they need to recom‐ pute more data if a node fails. Deterministic operators reduce the amount of data that needs to be recomputed.