python: threading 库指南

MindMap

py- threading.png

高清原图: 在此

Thread Objects

  • GIL: global interpreter lock
    The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time.

  • class threading.Thread(.., target=None, .., args=(), kwargs={}, .., ..)
    start() 函数调用 run(), run() 调用 target(*args, **kwargs).

  • Thread.__init__()
    If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

class Mythread(threading.Thread):
    def __init__(self):
        super(Mythread, self).__init__()
        ...
    def run(self):
        ...

Lock/RLock Objects

class threading.RLock

  • acquire(blocking=True, timeout=-1)
    When invoked without arguments: if this thread already owns the lock, increment the recursion level by one, and return immediately. Otherwise, if another thread owns the lock, block until the lock is unlocked. Once the lock is unlocked (not owned by any thread), then grab ownership, set the recursion level to one, and return. If more than one thread is blocked waiting until the lock is unlocked, only one at a time will be able to grab ownership of the lock. There is no return value in this case.
    ...

  • release()
    Release a lock, decrementing the recursion level. If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread.

官方文档结合代码一起看,更清楚:

  • in Lib/threading.py
class _RLock:
    ...
    def acquire(self, blocking=True, timeout=-1):
        me = get_ident()
        if self._owner == me:
            self._count += 1
            return 1
        rc = self._block.acquire(blocking, timeout)
        if rc:
            self._owner = me
            self._count = 1
        return rc
    ...
    def release(self):
        if self._owner != get_ident():
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1
        if not count:
            self._owner = None
            self._block.release()
    ...
    ...

Condition Objects

苍松 同学说: 可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

in Lib/threading.py

class Condition:
    def __init__(self, lock=None):
        if lock is None:      # If the lock argument is None, a new RLock object is created and used as the underlying lock.
            lock = RLock()
        self._lock = lock
        ...
        self._waiters = _deque()      # 等待池 
    ...
    def wait(self, timeout=None):
        if not self._is_owned():      # If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)    # 将它放到等待池中
        saved_state = self._release_save()      # 释放锁
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()      # 获取锁
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass

    def wait_for(self, predicate, timeout=None):
        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()      # 唤醒等待池的线程,并把它放到锁定池中
            try:
                all_waiters.remove(waiter)      # 移除已被唤醒了的线程
            except ValueError:
                pass

    def notify_all(self):
        self.notify(len(self._waiters))      # 唤醒所有在等待池中的线程

Semaphore Objects

A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().

  • in Lib/threading.py
class Semaphore:
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value

    def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._value -= 1
                rc = True
        return rc
    __enter__ = acquire

    def release(self):
        with self._cond:
            self._value += 1
            self._cond.notify()

    def __exit__(self, t, v, tb):
        self.release()



class BoundedSemaphore(Semaphore):
    def __init__(self, value=1):
        Semaphore.__init__(self, value)
        self._initial_value = value

    def release(self):
        with self._cond:
            if self._value >= self._initial_value:
                raise ValueError("Semaphore released too many times")
            self._value += 1
            self._cond.notify()

Event Objects

one thread signals an event and other threads wait for it.

苍松 同学说:
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

-In Lib/threading.py

class Event:
    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False

    ...
    def is_set(self):
        return self._flag

    isSet = is_set

    def set(self):
        with self._cond:
            self._flag = True
            self._cond.notify_all()        # 唤醒所有在等待阻塞中的线程

    def clear(self):
        with self._cond:
            self._flag = False

    def wait(self, timeout=None):
        with self._cond:
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled

Timer Objects

This class represents an action that should be run only after a certain amount of time has passed — a timer.

  • in Lib/threading.py
class Timer(Thread):
   def __init__(self, interval, function, args=None, kwargs=None):
       Thread.__init__(self)
       self.interval = interval
       self.function = function
       self.args = args if args is not None else []
       self.kwargs = kwargs if kwargs is not None else {}
       self.finished = Event()

   def cancel(self):
       """Stop the timer if it hasn't finished yet."""
       self.finished.set()          # set flag=True, and then the return of is_set() is True, so it would not execute the function.

   def run(self):
       self.finished.wait(self.interval)        # 在此等待阻塞中可能被取消
       if not self.finished.is_set():          # 若上一步出来,便表示没被取消,flag = False, 运行此function.
           self.function(*self.args, **self.kwargs)
       self.finished.set()

Barrier Obects

This class provides a simple synchronization primitive for use by a fixed number of threads that need to wait for each other.

来自 Mayank Kumar 同学的例子

import threading
 
barrier = threading.Barrier(3)
 
class thread(threading.Thread):
    def __init__(self, thread_ID):
        threading.Thread.__init__(self)
        self.thread_ID = thread_ID
    def run(self):
        print(str(self.thread_ID) + "\n")
        print("Parties = " + str(barrier.parties) + "\n")
        print("n_waiting = " + str(barrier.n_waiting) + "\n")
        barrier.wait()
         
thread1 = thread(100)
thread2 = thread(101)
 
thread1.start()
thread2.start()
 
barrier.wait()
 
print(str(barrier.broken) + "\n")
barrier.reset()
print("n_waiting after reset = " + str(barrier.n_waiting))
barrier.abort()
print("End")

运行结果:

100
Parties = 3
101
n_waiting = 0
Parties = 3
n_waiting = 1
False
n_waiting after reset = 0
End

with statement

All of the objects provided by this module that have acquire() and release() methods can be used as context managers for a with statement. The acquire() method will be called when the block is entered, and release() will be called when the block is exited.

with some_lock:
    # do something...

is equivalent to

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

read more

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容