MindMap
高清原图: 在此
Thread Objects
GIL: global interpreter lock
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time.class threading.Thread(.., target=None, .., args=(), kwargs={}, .., ..)
start() 函数调用 run(), run() 调用 target(*args, **kwargs).Thread.__init__()
If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
class Mythread(threading.Thread):
def __init__(self):
super(Mythread, self).__init__()
...
def run(self):
...
Lock/RLock Objects
class threading.RLock
acquire(blocking=True, timeout=-1)
When invoked without arguments: if this thread already owns the lock, increment the recursion level by one, and return immediately. Otherwise, if another thread owns the lock, block until the lock is unlocked. Once the lock is unlocked (not owned by any thread), then grab ownership, set the recursion level to one, and return. If more than one thread is blocked waiting until the lock is unlocked, only one at a time will be able to grab ownership of the lock. There is no return value in this case.
...release()
Release a lock, decrementing the recursion level. If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread.
官方文档结合代码一起看,更清楚:
- in Lib/threading.py
class _RLock:
...
def acquire(self, blocking=True, timeout=-1):
me = get_ident()
if self._owner == me:
self._count += 1
return 1
rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me
self._count = 1
return rc
...
def release(self):
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
if not count:
self._owner = None
self._block.release()
...
...
Condition Objects
苍松 同学说: 可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
in Lib/threading.py
class Condition:
def __init__(self, lock=None):
if lock is None: # If the lock argument is None, a new RLock object is created and used as the underlying lock.
lock = RLock()
self._lock = lock
...
self._waiters = _deque() # 等待池
...
def wait(self, timeout=None):
if not self._is_owned(): # If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter) # 将它放到等待池中
saved_state = self._release_save() # 释放锁
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire() # 获取锁
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def wait_for(self, predicate, timeout=None):
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release() # 唤醒等待池的线程,并把它放到锁定池中
try:
all_waiters.remove(waiter) # 移除已被唤醒了的线程
except ValueError:
pass
def notify_all(self):
self.notify(len(self._waiters)) # 唤醒所有在等待池中的线程
Semaphore Objects
A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().
- in Lib/threading.py
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
__enter__ = acquire
def release(self):
with self._cond:
self._value += 1
self._cond.notify()
def __exit__(self, t, v, tb):
self.release()
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
Semaphore.__init__(self, value)
self._initial_value = value
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
self._value += 1
self._cond.notify()
Event Objects
one thread signals an event and other threads wait for it.
苍松 同学说:
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
-In Lib/threading.py
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
...
def is_set(self):
return self._flag
isSet = is_set
def set(self):
with self._cond:
self._flag = True
self._cond.notify_all() # 唤醒所有在等待阻塞中的线程
def clear(self):
with self._cond:
self._flag = False
def wait(self, timeout=None):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
Timer Objects
This class represents an action that should be run only after a certain amount of time has passed — a timer.
- in Lib/threading.py
class Timer(Thread):
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet."""
self.finished.set() # set flag=True, and then the return of is_set() is True, so it would not execute the function.
def run(self):
self.finished.wait(self.interval) # 在此等待阻塞中可能被取消
if not self.finished.is_set(): # 若上一步出来,便表示没被取消,flag = False, 运行此function.
self.function(*self.args, **self.kwargs)
self.finished.set()
Barrier Obects
This class provides a simple synchronization primitive for use by a fixed number of threads that need to wait for each other.
来自 Mayank Kumar 同学的例子
import threading
barrier = threading.Barrier(3)
class thread(threading.Thread):
def __init__(self, thread_ID):
threading.Thread.__init__(self)
self.thread_ID = thread_ID
def run(self):
print(str(self.thread_ID) + "\n")
print("Parties = " + str(barrier.parties) + "\n")
print("n_waiting = " + str(barrier.n_waiting) + "\n")
barrier.wait()
thread1 = thread(100)
thread2 = thread(101)
thread1.start()
thread2.start()
barrier.wait()
print(str(barrier.broken) + "\n")
barrier.reset()
print("n_waiting after reset = " + str(barrier.n_waiting))
barrier.abort()
print("End")
运行结果:
100
Parties = 3
101
n_waiting = 0
Parties = 3
n_waiting = 1
False
n_waiting after reset = 0
End
with statement
All of the objects provided by this module that have acquire() and release() methods can be used as context managers for a with
statement. The acquire() method will be called when the block is entered, and release() will be called when the block is exited.
with some_lock:
# do something...
is equivalent to
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()
read more
Python ducumentation: threading
PyMOTW-3: threading — Manage Concurrent Operations Within a Process
苍松 同学的 python--threading多线程总结
tab609 同学的 Python多线程详解