简介
代码的易读性对于编写高质量软件非常重要,例如:清晰的接口及函数定义,统一的代码规范等。Python的定位就是简单而优雅,如何让Python代码写得更高效、更易度、更符合Python编程风格显得尤为重要。本文以实例的方式介绍如何以Python语言的最佳方式来编写简洁直观且易读的代码。
示例
用列表推导代替map和filter
可以通过列表推导(list comprehension,简写为LC)根据一份列表(list)生成另一份列表,同时列表推导比for循环和map函数具有更高的效率。
- LC vs map:如例所示,计算列表中每个数字的平方
a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# List comprehension
squares = [x**2 for x in a]
print(squares)
# >>> [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
# Map
squares = map(lambda x: x ** 2, a)
print(list(squares))
# >>> [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
- LC vs filter:如例所示,计算列表中可以被2整除的数字的平方
a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# List comprehension
even_squares = [x**2 for x in a if x % 2 == 0]
print(even_squares)
# >>> [4, 16, 36, 64, 100]
# filter
alt = map(lambda x: x**2, filter(lambda x: x % 2 == 0, a))
print (list(alt))
# >>> [4, 16, 36, 64, 100]
- LC for dict and set:字典和集合的列表表达式
chile_ranks = {'ghost': 1, 'habanero': 2, 'cayenne': 3}
rank_dict = {rank: name for name, rank in chile_ranks.items()}
print(rank_dict)
# >>> {1: 'ghost', 2: 'habanero', 3: 'cayenne'}
chile_len_set = {len(name) for name in rank_dict.values()}
print(chile_len_set)
# >>> {8, 5, 7}
要点:
- 列表推导比内置的map和filter函数更清晰,因为无需额外的lambda表达式。
- 列表推导可以跳过输入列表中的某些元素,如果使用map,就需要filter函数来辅助实现。
- 字典(dictionary)与集(set)也支持推导表达式。
数据量较大时用生成器表达式代替列表推导
列表推导(list comprehension,简写为LC)会给输入列表中的每一个值都创建一个新的只包含一个元素的列表。当输入数据较少时可能是很好用的,但是如果输入的数据非常庞大,可能会消耗大量内存,甚至导致程序崩溃。
如例所示,读取一个文件并且返回每行的字符数。如果使用列表推导,需要把文件每一行的长度都保存在内存中,如果这个文件很大或者数据来自一个可能永远不会关闭的网络套接字数据流,此时列表推导就会出问题。
# list comprehension
value = [len(x) for x in open('my_file.txt')]
print(value)
# >>> [77, 99, 21, 78, 29, 19, 73, 59, 16, 24]
上面的代码只适合处理少量的输入值,为了解决处理大数据的问题,Python提供了生成器表达式(generator expression),它是对列表推导和生成器的一种泛化。生成器表达式在运行过程中,不会把整个输出序列都呈现出来,而是通过迭代器每次根据生成器表达式产生一项数据。
把实现列表推导的语法放在一对()中,就构成了生成器表达式,返回的是一个迭代器,而不是具体内容。通过调用内置的next()函数,即可使其按照生成器表达式来输出下一个值,可以根据需要来操作数据,而无需担心内存不够用。
# Generator expression
it = (len(x) for x in open('my_file.txt'))
print(it)
# >>> <generator object <genexpr> at 0x0458CA30>
print(next(it))
# >>> 77
print(next(it))
# >>> 99
生成器表达式的另一个强大功能就在于其可以组合使用,如例所示,将刚才生成器表达式返回的迭代器用作另一个生成器表达式的输入值。
roots = ((x, x**0.5) for x in it)
print(next(roots))
# >>> (21, 4.58257569495584)
要点:
- 当输入数据量较大时,列表推导可能会因为占用过多内存而导致一些问题。
- 生成器表达式通过迭代的方式逐次产生输出项,可以防止出现内存危机。
- 把某个生成器表达式所返回的迭代器,放到另一个生成器表达式中,执行速度很快。
用enumerate代替range
可以使用for循环加range()函数的方式来迭代一个集合,同时获得当前值在列表中的下标,另外Python也提供了一个枚举函数enumerate()来包装任何的迭代器,函数返回的一对字段代表了迭代器中当前项的下标及当前值,通过使用enumerate(),代码显得更加干净整洁,并且效率更高。
flavor_list = ['vanilla', 'chocolate', 'pecan', 'strawberry']
# range()
for i in range(len(flavor_list)):
flavor = flavor_list[i]
print('%d: %s' % (i + 1, flavor))
'''
>>>
1: vanilla
2: chocolate
3:pecan
4:strawberry
'''
# enumerate()
for i, flavor in enumerate(flavor_list):
print('%d: %s' % (i + 1, flavor))
'''
>>>
1: vanilla
2: chocolate
3:pecan
4:strawberry
'''
要点:
- enumerate()提供了简洁的语法,迭代时既能获取列表的下标,也能获取当前值。
- 在索引化一个序列的时候,应该避免使用range(),而应该使用enumerate()。
- 可以设置enumerate()的第二个参数来指定索引开始的序号,默认为0。
使用zip同时遍历多个序列
可以使用zip()函数同时遍历多个序列,每次分别从一个序列中取一个元素。
headers = ['name', 'shares', 'price']
values = ['ACME', 100, 490.1]
s = dict(zip(headers,values))
print(s)
# >>> {'name': 'ACME', 'shares': 100, 'price': 490.1}
a = [1, 2, 3]
b = ['w', 'x', 'y', 'z']
for i in zip(a,b):
print(i)
'''
>>>
(1, 'w')
(2, 'x')
(3, 'y')
'''
要点:
- zip()可以并行地遍历多个迭代器。
- Python 3中的zip()在遍历过程中返回元组,而Python 2中的zip()返回一份包含所有元祖的列表。
- zip()的迭代长度跟参数中最短序列长度一致。
使用try/except/else/finally结构
Python程序的异常处理需要考虑四种不同的情况,这些情况可以被try/except/else/finally块进行处理。
- try: 负责捕捉异常
- except: 负责处理异常
- finally: 负责清理工作,无论异常是否发生,都会被执行。
- else: 如果没有发生异常,就会被执行。else块可以使得try块中的代码变得更加的简洁,提升代码的可读性。
def divide(a, b):
try:
result = a / b
except ZeroDivisionError as e:
print("Error: " + str(e))
return None
else:
print("Result is %.1f" % result)
return result
finally:
print("finally")
divide(2, 5)
'''
>>>
Result is 0.4
finally
'''
divide(2, 0)
'''
>>>
Error: division by zero
finally
'''
函数发生异常时返回用异常代替None
在某些情况下函数返回None似乎很符合逻辑,例如:在计算两数相除的函数中,在除数为零时返回None。
def divide(a, b):
try:
return a / b
except ZeroDivisionError:
return None
调用这个函数的代码就可以根据返回的结果来作进一步的处理,但是在一些条件表达式中,None和一些特殊值(0,空字符串等)都会返回False,使用None作为返回值容易让调用者犯错。
nums = [(1, 0), (0, 1)]
for x, y in nums:
result = divide(x, y)
if not result:
print('Invalid inputs')
else:
print('Result is %.1f' % result)
'''
>>>
Invalid inputs
Invalid inputs
'''
更好的方法就是永远不返回None值,而是在发生异常时上抛给调用方,让调用方来处理。
def divide(a, b):
try:
return a / b
except ZeroDivisionError as e:
raise ValueError('Invalid inputs') from e
现在,调用方需要自行处理由于输入值存在问题而引发的异常,并且调用方也不需要使用条件语句来判断函数的返回值了。
x, y = 5, 2
try:
result = divide(x, y)
except ValueError:
print("Invalid inputs")
else:
print("Result is %.1f"% result)
# >>> Result is 2.5
如果发生异常,处理的代码也更加清晰。
x, y = 5, 0
try:
result = divide(x, y)
except ValueError:
print("Invalid inputs")
else:
print("Result is %.1f"% result)
# >>> Invalid inputs
要点:
- 函数返回None来表示特殊意义容易出错,因为None和0及空字符串之类的值,在一些条件表达式中会被认作False。
- 函数在遇到特殊情况时,应该抛出异常,而不是返回None,由调用者负责编写相应的代码处理异常。
用线程来执行阻塞式I/O,但不要用来做并行计算
Python是一种编程语言,根据其实现的不同,有Cpython, Jython, Pypy等,其语法是想通的,但是类库的实现是不同的。目前应用最广泛的Python实现是CPython,即用C语言实现Python及其解释器(JIT编译器)。
CPython分两步来运行Python程序,首先,把文本格式的源代码解析并编译成字节码;然后,用一种基于栈的解释器来运行这份字节码。执行Python程序时,字节码解释器采用GIL(Global Interpreter Lock,解释器全局锁)来确保协调一致,GIL实际上就是一把互斥锁,防止某个线程在执行过程中被另外一个线程打断,这种线程间的干扰操作可能会破坏解释器的状态,GIL可保证每条字节码指令均能够正确地与CPython实现及其扩展模块协同运作。
GIL的副作用也很明显,尽管Python程序也支持多线程,但是由于GIL的保护,同一时刻只有一条线程可以向前执行,这就意味着,在Python中利用多线程来并行计算(parallel computation),并不能取得理想的结果。
如例所示,factorize()对一个数字进行因式分解。
def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i
对数列逐个进行分解:
from time import time
numbers = [2139079, 1214759, 1516637, 1852285]
start = time()
for number in numbers:
list(factorize(number))
end = time()
print('Took %.3f seconds' % (end - start))
# >>> Took 1.445 seconds
为数列中每一个数都启动一个线程,在线程中进行分解:
from threading import Thread
numbers = [2139079, 1214759, 1516637, 1852285]
class FactorizeThread(Thread):
def __init__(self, number):
super().__init__()
self.number = number
def run(self):
self.factors = list(factorize(self.number))
start = time()
threads = []
for number in numbers:
thread = FactorizeThread(number)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time()
print('Took %.3f seconds' % (end - start))
# >>> Took 1.435 seconds
从执行结果可以看出,使用多线程进行并行计算因为受到GIL的影响,在整体的执行速度上与逐个执行factorize()相比并没有太大差别。但是Python程序在处理阻塞式I/O操作时必须花一些时间,如:读写文件,网络间通信,与外设交互等,在这段时间内,Python多线程可以把程序与这些耗时操作隔离开。
如例所示,slow_systemcall()模拟一个耗时的系统调用,请求操作系统阻塞0.1秒,然后把控制权还给程序。
import select, socket
def slow_systemcall():
select.select([socket.socket()], [], [], 0.1)
如果是逐个执行slow_systemcall()函数,主程序在运行到slow_systemcall()函数时,必须等待函数退出才能继续向下执行,因此,程序所需的总时间将会随着调用次数的增加而增加。
start = time()
for _ in range(100):
slow_systemcall()
end = time()
print('Took %.3f seconds' % (end - start))
# >>> Took 10.092 seconds
把slow_systemcall()函数放到多个线程中执行,保证主程序在运行到slow_systemcall()函数时,也能够同时在主程序里执行所需的计算工作。
start = time()
threads = []
for _ in range(100):
thread = Thread(target=slow_systemcall)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time()
print('Took %.3f seconds' % (end - start))
# >>> Took 0.128 seconds
要点:
- 因为GIL的限制,Python多线程不能在多核的CPU上并行执行字节码。
- Python多线程可以使程序在执行阻塞式I/O操作时,并行地执行普通运算操作。
使用协程代替线程来实现并发运行多个函数
Python可以使用线程来模拟同时运行多个函数,但是线程有以下三个缺点:
- 为了确保数据安全,必须使用特殊的工具来协调这些线程,如:Lock,Queue等,对于复杂的多线程代码,会变得难以扩展和维护。
- 线程需要占用大量内存,每个正在执行的线程大约需要8MB内存。如果程序中线程过多,可能就会出现问题。
- 线程启动开销大,如果程序中创建新线程过于频繁,就会降低整个程序的运行速度。
Python中的协程(coroutine)可以避免上述问题,协程是生成器的一种扩展,启动生成器所需的开销,与调用函数的开销差不多,处于活跃状态的协程,只会占用不到1KB的内存。子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
协程通过函数中使用yield语句来实现,yield不但可以返回一个值,它还可以接收调用者发出的参数。每当生成器执行到yield表达式的时候,通过send()函数向生成器回传一个值,而这个值代表了yield表达式的执行结果。
def my_coroutine():
while True:
received = yield
print('Received:', received)
it = my_coroutine()
next(it) # Prime the coroutine
it.send('First')
it.send('Second')
'''
>>>
Received: First
Received: Second
'''
在生成器调用send()之前,需要调用一次next()函数,用来将生成器推进到第一条yield表达式那里准备接受第一个send()。
如例所示,编写一个协程,并给它发送许多数值,协程每次收到一个数值后返回当前所统计到的最小值,并等待传入下一个数值。
def minimize():
current = yield
while True:
value = yield current
current = min(value, current)
it = minimize()
next(it) # Prime the generator
print(it.send(10))
print(it.send(4))
print(it.send(22))
print(it.send(-1))
'''
>>>
10
4
4
-1
'''
生成器函数minimize()会一直运行下去,每次调用send()之后都会产生新的值。协程与线程相似之处在于,它可以消耗由外部所传入的输入数据,并产生相应的输出结果。协程与线程不同之处在于,协程会在生成器函数的每个yield表达式那里暂停,直到外部再次调用send()函数,它才会继续执行到下一条yield表达式。更为重要的是,程序可以通过生成器产生的输出值,来推进其他的生成器函数,使得其他的生成器函数也执行到它们各自的下一条yield表达式。接连推进多个独立的生成器,即可模拟出Python线程的并发行为,让程序看上去好像在同时运行多个函数。
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如例所示,如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
'''
>>>
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''
示例中consumer函数是一个generator,把一个consumer传入produce后:
- 首先调用c.send(None)启动生成器;
- 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
- consumer通过yield拿到消息,处理,又通过yield把结果传回;
- produce拿到consumer处理的结果,继续生产下一条消息;
- produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
要点:
- 协程提供了一种有效的方式,让程序看上去好像能够哦同时运行大量函数。
- 协程是一个强大的工具,可以把程序的核心逻辑,与程序与外部交互时所用的代码相分离。
- 协程不具有线程的某些优点,例如,如果在执行I/O阻塞程序时, 协程会让整个任务挂起直到操作完成。
用unnittest来测试代码
Python没有静态类型检查机制,编译器不能保证程序一定会在运行时被正确地执行Python语言的动态特性使得开发者能够非常容易地编写测试代码,unittest是Python自带的单元测试模块,为构建和执行测试提供了非常丰富的工具集。
如例所示,to_str()将bytes类型转化为str类型:
def to_str(data):
if isinstance(data, str):
return data
elif isinstance(data, bytes):
return data.decode('utf-8')
else:
raise TypeError('Must supply str or bytes, '
'found: %r' % data)
编写to_str()的测试用例MyTest,MyTest是unittest.TestCase的子类,三个独立的测试用例以test开头,用这样的命名规则来约定哪些方法是test runner需要执行的。我们也可以自定义一些辅助方法,这些方法的函数名不能以test开头。
如果需要在运行测试用例前后,执行一些初始化及清理工作,可以覆写setUp()和tearDown()。系统在执行每个测试前,调用一次setUp(),若 setUp() 方法引发异常,测试框架会认为测试发生了错误,因此测试方法不会被运行。若 setUp() 成功运行,无论测试方法是否成功,都会运行一次tearDown()。
from unittest import TestCase, main
class MyTest(TestCase):
def setUp(self):
print("setup")
def tearDown(self):
print("teardown")
# Test methods follow
def test_to_str_bytes(self):
self.assertEqual('hello', to_str(b'hello'))
def test_to_str_str(self):
self.assertEqual('hello', to_str('hello'))
def test_to_str_bad(self):
self.assertRaises(TypeError, to_str, object())
if __name__ == '__main__':
main()
'''
>>>
setup
teardown
.setup
teardown
.setup
teardown
.
----------------------------------------------------------------------
Ran 3 tests in 0.055s
OK
'''
要点:
- 要想确信Python程序能够正常运行,最好的办法就是编写测试代码。
- 内置的unittest模块提供了测试所需的很多功能,足以满足大部分用户的需求。
- 可以在TestCase子类中为每一个需要测试的用例,定义对应的测试方法,该测试方法的名称必须以test开头,使用assert*()方法来检查结果。