python并行框架对比

虽然Python的多处理库已经成功地用于广泛的应用程序中,但是在本文中,我们发现它不适合一些重要的应用程序类,包括数值数据处理、状态计算和具有昂贵初始化的计算。主要有两个原因:

  1. 处理数字数据效率低下
  2. 无法在单独的“任务”之间共享变量
    本文将比较python原生多任务包multiprocessing,joblib包,以及ray包,在不同环境测试他们的并行性能

Ray是一个快速、简单的框架,用于构建和运行解决这些问题的分布式应用程序。有关一些基本概念的介绍,请参阅本文。Ray利用Apache Arrow进行高效的数据处理,并为分布式计算提供任务和参与者抽象。
Joblib是一组在Python中提供轻量级管道的工具,Joblib特别针对大数据进行了快速和健壮的优化,并对numpy数组进行了特定的优化。

1.字符串并行处理

这里引入ray,joblib和multiprocessing Pool, 默认设定为8核运行

# 测试并行性能
import ray
ray.init(num_cpus=8)
import joblib
from multiprocessing import Pool
import pandas as pd
import numpy as np

接下来,我们生成一对长度为80w的字符串数据, 统计相同位置字符的一致率。并统计三个包的并行,首先我们进行16次对比,都使用8核处理。

def compare_string(args):
    string_1, string_2 = args
    same = 0
    for i in range(len(string_1)):
        if string_1[i] == string_2[i]:
            same += 1
    return same
# ray版本的字符串对比, 只是加了一个修饰器
@ray.remote
def compare_string2(args):
    string_1, string_2 = args
    same = 0
    for i in range(len(string_1)):
        if string_1[i] == string_2[i]:
            same += 1
    return same

string_1 = ['0']*800000
string_2 = ['0']*800000
args = [[string_1, string_2] for i in range(16)] # 重复16次
# 把multiprocessing 测试包在一个函数中
def test_pool(func, args):
    pool = Pool(8)
    ret = pool.map(func, args)
    pool.close()
    pool.join()
    return ret
# 测试multiprocessing pool
%timeit ret = test_pool(compare_string, args) 
# 测试joblib Parallel 的loky模式(默认模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 测试joblib Parallel 的multiprocessing模式(多进程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 测试ray框架并行
%timeit ret = [compare_string2.remote(arg) for arg in args]

结果如下:

# multiprocessing pool平均时间
1.21 s ± 82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均时间
42.9 s ± 418 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
1.55 s ± 117 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均时间
835 ms ± 27.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
字符串时间对比

joblib loky模式直接pass掉,用时太长了,这里字符串对比一共有16次,我们增加10倍看一下其它三组的排名是否还是一致。

string_1 = ['0']*800000
string_2 = ['0']*800000
args = [[string_1, string_2] for i in range(160)] # 重复160次
# 测试multiprocessing pool
%timeit ret = test_pool(compare_string, args) 
# 测试joblib Parallel 的multiprocessing模式(多进程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 测试ray框架并行
%timeit ret = [compare_string2.remote(arg) for arg in args]
# multiprocessing pool平均时间
2.92 s ± 84.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
10.6 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均时间
7.92 s ± 241 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
字符串时间对比2
当重复次数变多,并行数(8核)不变时,python原生multiprocessing pool反而更快,因此如果是非数值计算,字符串统计还是建议使用python原生multiprocessing

2.数字并行处理

第二组对比我们进行纯数字计算对比,这里我们测试计算斐波那契数列并行用时

def fib_loop(n):
    a, b = 0, 1
    for i in range(n + 1):
        a, b = b, a + b
    return a
# ray 版本
@ray.remote
def fib_loop2(n):
    a, b = 0, 1
    for i in range(n + 1):
        a, b = b, a + b
    return a
args = [10000]*1600 # 重复计算1600次,每次计算n=10000的斐波那契数列
# 测试multiprocessing pool
%timeit ret = test_pool(fib_loop, args)
# 测试joblib Parallel 的loky模式(默认模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
# 测试joblib Parallel 的multiprocessing模式(多进程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
# 测试ray框架并行
%timeit ret = [fib_loop2.remote(arg) for arg in args]
# multiprocessing pool平均时间
742 ms ± 46.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均时间
782 ms ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
608 ms ± 21.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均时间
873 ms ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
数值计算时间对比
可以看出 joblib 和 ray框架都对数值计算进行了优化,joblib的multiprocessing最快,起始这里时间的差异更多的应该是进程通信的误差

3.矩阵运算

上面只是使用了简单的加法操作,这里使用scipy的矩阵运算,看看三种框架对矩阵运算的优化情况

import scipy.signal as s
def scipy_convolve2d(args):
    image, random_filter = args
    return s.convolve2d(image, random_filter)[::5, ::5]
# ray版本
@ray.remote
def scipy_convolve2d2(args):
    image, random_filter = args
    return s.convolve2d(image, random_filter)[::5, ::5]

filters = [np.random.normal(size=(4, 4)) for _ in range(8)]
# 并行参数直接打包为args列表
args = [(np.zeros((4000, 4000)), filters[i]) for i in range(8)]
# multiprocessing pool平均时间
%timeit ret = test_pool(scipy_convolve2d, args)
# joblib Parallel 的loky模式 平均时间
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
# joblib Parallel 的multiprocessing模式 平均时间
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
#ray平均时间
%timeit ret = ray.get([scipy_convolve2d2.remote(arg) for arg in args])
# multiprocessing pool平均时间
3.36 s ± 143 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均时间
1.9 s ± 64.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
1.38 s ± 45.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#ray平均时间
1.31 s ± 53.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
多进程矩阵运算平均用时

可以看出ray相对来说是最快的

4.共享内存

joblib和ray相较于原始python多进程的优势的另一个方面就是对内存的优化,对于一个较大的数据,我们只想要其中的一部分,joblib和ray都可以使用共享内存完成相应部分的计算,而不是每一个进程都放入一份独立完整的数据。

我们使用ray和joblib,共享一份pd.DataFrame,并统计每列所有类型的出现次数
# 生成一份大内存的pd.DataFrame
zeros = np.zeros((10000,1000))
ones = np.ones((10000,1000))
df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
def value_counts(df, i):
    return df.iloc[:,i].value_counts().to_dict()
# ray版本函数
@ray.remote
def value_counts2(df, i):
    return df.iloc[:,i].value_counts().to_dict()
# joblib共享内存函数
import os
import tempfile
def memmap(data):
    tmp_folder = tempfile.mkdtemp()
    tmp_path = tmp_folder + '/joblib.mmap'
    if os.path.exists(tmp_path):  # 若存在则删除
        os.remove(tmp_path)
    _ = joblib.dump(data, tmp_path)
    memmap_data = joblib.load(tmp_path, mmap_mode='r+')
    return memmap_data
shared_df = memmap(df) # joblib 的共享内存方法 shared_df就是共享内存的df

df_id = ray.put(df) # ray共享内存的方法(封装好了更简单一些)
# joblib Parallel 的loky模式
%time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# joblib Parallel 的multiprocessing模式
%time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# ray
%time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
# joblib Parallel 的loky模式 跑一次时间
CPU times: user 596 ms, sys: 68 ms, total: 664 ms
Wall time: 1.23 s
# joblib Parallel 的multiprocessing 跑一次时间
CPU times: user 152 ms, sys: 64 ms, total: 216 ms
Wall time: 611 ms
# ray 跑一次时间
CPU times: user 352 ms, sys: 64 ms, total: 416 ms
Wall time: 784 ms
共享内存时间对比
结果还是joblib的multiprocessing模式最快,不过时间差距应该不大

这里df实际上是个数值矩阵,如果将其变为字符串格式,速度会不会下降呢?
我们将df维度降低变为100维,数值类型变为str

# pandas处理文件,统计
zeros = np.zeros((10000,100))
ones = np.ones((10000,100))
df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
df = df.astype(str) # 转为字符串
# joblib Parallel 的loky模式
%time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# joblib Parallel 的multiprocessing模式
%time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# ray
%time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
joblib Parallel 的loky模式 跑一次时间
CPU times: user 57.7 s, sys: 3.33 s, total: 1min 1s
Wall time: 1min 1s
# joblib Parallel 的multiprocessing模式 跑一次时间
CPU times: user 52.9 s, sys: 3.21 s, total: 56.1 s
Wall time: 56.7 s
# ray 跑一次时间
CPU times: user 256 ms, sys: 76 ms, total: 332 ms
Wall time: 4.7 s
共享内存时间测试2
令人吃惊的是将DataFrame int类型转为str类型后,ray框架并行计算时间惊人的减少,很有可能在上次对比中数值类型并不能时间反映出两种框架对共享内存的使用效率。

4.总结

总的来说这三个包在一些小人物中并行时间上面差异并不大
ray和joblib都对数值计算进行了优化
在处理pandas共享数据时ray的优势更明显

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容