背景
因为需要进行代码优化。所以进行数据表的整表COPY
一直很好奇,多进程对于copy是否有优化呢?于是做了一些实验。
实验环境:32核I5的服务器。内存200G
实验一: 两个进程 分开copy两个表
Python代码如下
dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'
conn1 = psycopg2.connect(dsn=dsn)
conn2 = psycopg2.connect(dsn=dsn)
io1 = open('rdb_node.csv', 'w')
io2 = open('rdb_node_with_all_attri_view.csv', 'w')
sql1 = """copy (select * from rdb_node order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
sql2 = """copy (select * from rdb_node_with_all_attri_view order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
def table_size(table_name, c):
cur = c.cursor()
cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
s = cur.fetchone()[0]
cur.close()
return s
print 'rdb_node size:', table_size('rdb_node', conn1)
print 'rdb_node_with_all_attri_view:', table_size('rdb_node_with_all_attri_view', conn1)
def work(conn, sql, io):
ss = time.time()
cur = conn.cursor()
cur.copy_expert(sql, io)
print 'PID {} cost: {}'.format(multiprocessing.current_process().pid, time.time() - ss)
multiprocessing.Process(target=work, args=(conn1, sql1, io1)).start()
multiprocessing.Process(target=work, args=(conn2, sql2, io2)).start()
结果
multi Process COPY multi table
rdb_node size: 2559 MB
rdb_node_with_all_attri_view: 2073 MB
PID 18489 cost: 69.7677941322
PID 18490 cost: 75.4461951256
实验二: 一个进程copy两个表
Python代码如下
dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'
conn = psycopg2.connect(dsn=dsn)
io1 = open('rdb_node.csv', 'w')
io2 = open('rdb_node_with_all_attri_view.csv', 'w')
sql1 = """copy (select * from rdb_node order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
sql2 = """copy (select * from rdb_node_with_all_attri_view order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
def table_size(table_name, c):
cur = c.cursor()
cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
s = cur.fetchone()[0]
cur.close()
return s
print 'rdb_node size:', table_size('rdb_node', conn)
print 'rdb_node_with_all_attri_view:', table_size('rdb_node_with_all_attri_view', conn)
s = time.time()
cur1 = conn.cursor()
cur1.copy_expert(sql1, io1)
cur1.close()
cur2 = conn.cursor()
cur2.copy_expert(sql2, io2)
print 'cost:', time.time() - s
cur2.close()
conn.close()
io1.close()
io2.close()
结果
one Process COPY multi table
rdb_node size: 2559 MB
rdb_node_with_all_attri_view: 2073 MB
cost: 92.9935839176
实验三:多进程访问单表
代码
def main(cpu_count):
process_num = cpu_count - 1
dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'
sql = """
copy (select * from rdb_node where node_id_t >= {} and node_id_t <= {} order by node_id_t) to STDOUT delimiter '|' csv header
"""
init_conn = psycopg2.connect(dsn)
init_cursor = init_conn.cursor()
init_cursor.execute('select node_id_t from rdb_node order by node_id_t')
tile_id_list = [row[0] for row in init_cursor.fetchall()]
def table_size(table_name, c):
cur = c.cursor()
cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
s = cur.fetchone()[0]
cur.close()
return s
print 'rdb_node size:', table_size('rdb_node', init_conn)
print 'process num:', process_num
init_cursor.close()
init_conn.close()
conn_pool = []
for i in range(0, process_num):
conn_pool.append(psycopg2.connect(dsn=dsn))
io_pool = []
for i in range(0, process_num):
io_pool.append(open('b{}.csv'.format(i), 'w'))
tile_range = []
for i in xrange(process_num):
start_index = (len(tile_id_list) / process_num) * i
if i == process_num - 1:
end_index = len(tile_id_list) - 1
else:
end_index = (len(tile_id_list) / process_num) * (i+1) -1
if len(tile_range) > 0 and tile_id_list[end_index] == tile_range[-1][-1]:
tile_range.append((tile_id_list[start_index], tile_id_list[end_index]+1))
else:
tile_range.append((tile_id_list[start_index], tile_id_list[end_index] + 1))
def work(conn, io, r):
s = time.time()
cur = conn.cursor()
cur.copy_expert(sql.format(*r), io)
io.close()
cur.close()
conn.close()
print 'PID {} cost: {}'.format(multiprocessing.current_process().pid, time.time() - s)
process_pool = []
for i in xrange(0, process_num):
process_pool.append(multiprocessing.Process(target=work, args=(conn_pool[i], io_pool[i], tile_range[i])))
for p in process_pool:
p.start()
for p in process_pool:
p.join()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('cpu_core', type=int)
args = parser.parse_args()
main(args.cpu_core)
time.sleep(5)
结果
multi Process COPY one table
rdb_node size: 2559 MB
process num: 30
PID 18394 cost: 29.2145748138
...
PID 18383 cost: 38.1118938923
process num: 15
PID 18409 cost: 15.162913084
...
PID 18416 cost: 38.1842360497
process num: 8
PID 18422 cost: 30.9688498974
...
PID 18424 cost: 44.9193379879
process num: 2
PID 18443 cost: 44.0086810589
PID 18442 cost: 44.4861030579
process num: 1
PID 18448 cost: 44.115489006
结论
多进程查询单表,优化的时间主要在于,查询出结果之后由PYTHON转移数据到内存中,其他进程可以进行查询,相当于做了并发。但是在数据库查询层面并没有优化太多。所以认为单表查询进程不应该超过3个,否则进程资源消耗太大
多进程查询多表,优化时间没有想象中那么大。但是还是有优化的,所以要用起来。