Python多进程COPY PostgreSql表实验

背景

因为需要进行代码优化。所以进行数据表的整表COPY
一直很好奇,多进程对于copy是否有优化呢?于是做了一些实验。
实验环境:32核I5的服务器。内存200G

实验一: 两个进程 分开copy两个表

Python代码如下

dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'

conn1 = psycopg2.connect(dsn=dsn)
conn2 = psycopg2.connect(dsn=dsn)

io1 = open('rdb_node.csv', 'w')
io2 = open('rdb_node_with_all_attri_view.csv', 'w')


sql1 = """copy (select * from rdb_node order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
sql2 = """copy (select * from rdb_node_with_all_attri_view order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""


def table_size(table_name, c):
    cur = c.cursor()
    cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
    s = cur.fetchone()[0]
    cur.close()
    return s

print 'rdb_node size:', table_size('rdb_node', conn1)
print 'rdb_node_with_all_attri_view:', table_size('rdb_node_with_all_attri_view', conn1)


def work(conn, sql, io):
    ss = time.time()
    cur = conn.cursor()
    cur.copy_expert(sql, io)
    print 'PID {} cost: {}'.format(multiprocessing.current_process().pid, time.time() - ss)


multiprocessing.Process(target=work, args=(conn1, sql1, io1)).start()
multiprocessing.Process(target=work, args=(conn2, sql2, io2)).start()

结果

multi Process COPY multi table
rdb_node size: 2559 MB
rdb_node_with_all_attri_view: 2073 MB
PID 18489 cost: 69.7677941322
PID 18490 cost: 75.4461951256

实验二: 一个进程copy两个表

Python代码如下

dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'
conn = psycopg2.connect(dsn=dsn)
io1 = open('rdb_node.csv', 'w')
io2 = open('rdb_node_with_all_attri_view.csv', 'w')
sql1 = """copy (select * from rdb_node order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
sql2 = """copy (select * from rdb_node_with_all_attri_view order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""


def table_size(table_name, c):
    cur = c.cursor()
    cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
    s = cur.fetchone()[0]
    cur.close()
    return s

print 'rdb_node size:', table_size('rdb_node', conn)
print 'rdb_node_with_all_attri_view:', table_size('rdb_node_with_all_attri_view', conn)

s = time.time()
cur1 = conn.cursor()
cur1.copy_expert(sql1, io1)
cur1.close()
cur2 = conn.cursor()
cur2.copy_expert(sql2, io2)


print 'cost:', time.time() - s

cur2.close()
conn.close()
io1.close()
io2.close()

结果

one Process COPY multi table
rdb_node size: 2559 MB
rdb_node_with_all_attri_view: 2073 MB
cost: 92.9935839176

实验三:多进程访问单表

代码

def main(cpu_count):
    process_num = cpu_count - 1
    dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'

    sql = """
    copy (select * from rdb_node where node_id_t >= {} and node_id_t <= {} order by node_id_t) to STDOUT delimiter '|' csv header 
    """

    init_conn = psycopg2.connect(dsn)
    init_cursor = init_conn.cursor()
    init_cursor.execute('select node_id_t from rdb_node order by node_id_t')
    tile_id_list = [row[0] for row in init_cursor.fetchall()]

    def table_size(table_name, c):
        cur = c.cursor()
        cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
        s = cur.fetchone()[0]
        cur.close()
        return s

    print 'rdb_node size:', table_size('rdb_node', init_conn)
    print 'process num:', process_num

    init_cursor.close()
    init_conn.close()

    conn_pool = []
    for i in range(0, process_num):
        conn_pool.append(psycopg2.connect(dsn=dsn))

    io_pool = []
    for i in range(0, process_num):
        io_pool.append(open('b{}.csv'.format(i), 'w'))
    tile_range = []
    for i in xrange(process_num):
        start_index = (len(tile_id_list) / process_num) * i
        if i == process_num - 1:
            end_index = len(tile_id_list) - 1
        else:
            end_index = (len(tile_id_list) / process_num) * (i+1) -1

        if len(tile_range) > 0 and tile_id_list[end_index] == tile_range[-1][-1]:
                tile_range.append((tile_id_list[start_index], tile_id_list[end_index]+1))
        else:
            tile_range.append((tile_id_list[start_index], tile_id_list[end_index] + 1))

    def work(conn, io, r):
        s = time.time()
        cur = conn.cursor()
        cur.copy_expert(sql.format(*r), io)
        io.close()
        cur.close()
        conn.close()
        print 'PID {} cost: {}'.format(multiprocessing.current_process().pid, time.time() - s)

    process_pool = []
    for i in xrange(0, process_num):
        process_pool.append(multiprocessing.Process(target=work, args=(conn_pool[i], io_pool[i], tile_range[i])))

    for p in process_pool:
        p.start()

    for p in process_pool:
        p.join()


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('cpu_core', type=int)
    args = parser.parse_args()
    main(args.cpu_core)
    time.sleep(5)

结果

multi Process COPY one table
rdb_node size: 2559 MB
process num: 30
PID 18394 cost: 29.2145748138
...
PID 18383 cost: 38.1118938923

process num: 15
PID 18409 cost: 15.162913084
...
PID 18416 cost: 38.1842360497

process num: 8
PID 18422 cost: 30.9688498974
...
PID 18424 cost: 44.9193379879

process num: 2
PID 18443 cost: 44.0086810589
PID 18442 cost: 44.4861030579

process num: 1
PID 18448 cost: 44.115489006

结论

  • 多进程查询单表,优化的时间主要在于,查询出结果之后由PYTHON转移数据到内存中,其他进程可以进行查询,相当于做了并发。但是在数据库查询层面并没有优化太多。所以认为单表查询进程不应该超过3个,否则进程资源消耗太大

  • 多进程查询多表,优化时间没有想象中那么大。但是还是有优化的,所以要用起来。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,810评论 25 707
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,650评论 24 1,002
  • http://python.jobbole.com/85231/ 关于专业技能写完项目接着写写一名3年工作经验的J...
    燕京博士阅读 7,557评论 1 118
  • 又一年暑假即将结束,和侄子既快乐又“痛苦”的相处就这样到了倒计时的最后一天。没有往日的闹别扭,却有了出奇的黏人――...
    娴哥_252c阅读 613评论 0 2
  • 雨在窗外淅沥沥的下着,街道上的行人都举起了随身的雨伞,红的、白的、黑的、蓝的、还有几个带有图案的。离近一看原来上面...
    墨小凝阅读 376评论 0 2