前言¶
如题所述,multiprocessing.Queue 在特定场景下会出现内部锁异常, 导致 Queue 实例出现既无法写入数据也无法读取数据的情况。
读取方意外退出导致内部异常¶
一个场景是当读取方意外退出后,即便后来又有了新的读取方, 还是会出现内部异常最终导致写入方无法写入的问题。
复现问题¶
可以通过下面的代码复现这个问题:
import multiprocessing
import time
def reader(q):
while True:
try:
q.get(timeout=5)
print('read')
except Exception as e:
print('reader error: {!r}'.format(e))
time.sleep(1)
def writer(q):
while True:
try:
q.put_nowait(time.time())
print('write')
except Exception as e:
print('writer error: {!r}'.format(e))
time.sleep(1)
q = multiprocessing.Queue(2)
w = multiprocessing.Process(target=writer, args=(q,))
w.start()
r = multiprocessing.Process(target=reader, args=(q,))
r.start()
print('reader pid: {}'.format(r.pid))
while True:
if not r.is_alive():
print('reader dead, start new reader')
r = multiprocessing.Process(target=reader, args=(q,))
r.start()
print('reader pid: {}'.format(r.pid))
time.sleep(1)
运行这段代码,在代码运行过程中 kill 掉 reader 进程模拟读取方意外退出的情况:
$ python3.8 reader_deadlock.py
reader pid: 53
write
write
writer error: Full()
read
read
write
write
writer error: Full()
read
read
...
read
read
reader dead, start new reader # <-- kill 53
reader pid: 55
write
write
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
reader error: Empty()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
reader error: Empty()
可以看到,在杀掉 reader 进程后,即便后来再重新启动了一个 reader 进程, 还是出现了 writer 写不进去、reader 读不出来的问题。
原因¶
为什么会出现这个问题呢?看一下 Queue 内部 get 和 put 方法的 关键代码 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | class Queue(object): def __init__(self, maxsize=0, *, ctx): # ... self._rlock = ctx.Lock() # ctx.Lock: class Lock(SemLock) # ... self._sem = ctx.BoundedSemaphore(maxsize) # ... def get(self, block=True, timeout=None): # ... if block and timeout is None: with self._rlock: res = self._recv_bytes() self._sem.release() # <- 释放 semaphore else: if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout): # <- 获取锁 raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout): # <- 阻塞直到有数据或超时 raise Empty elif not self._poll(): raise Empty res = self._recv_bytes() self._sem.release() # <- 释放 semaphore finally: self._rlock.release() # <- 释放获取的锁 # unserialize the data after having released the lock return _ForkingPickler.loads(res) def put_nowait(self, obj): return self.put(obj, False) |
为什么 kill 掉 reader 进程后就会出现前面的问题呢, 因为当 reader 进程阻塞在 get(timeout=5) 调用时,实际是阻塞在上面的 24 行代码那里, 此时 reader 进程已经通过 19 行的代码获取到了 self._rlock 这个锁, 当 kill reader 进程时,reader 进程退出并且退出的时候不会执行 31 行的释放锁操作。 当重新启动一个 reader 进程后,新 reader 进程执行 get(timeout=5) 调用时 会因为 19 行获取不到 self._rlock 这个锁导致总是抛出 Empty 异常,因为这个锁在前面那个 reader 进程里被 acquire 了,但是没有被 release 也永远都不会被 release 了,导致内部的 self._rlock 这个锁异常。
新的 reader 进程因为内部的 self._rlock 锁异常导致一直无法读取数据,进而导致 writer 进程无法写入数据,因为队列一直是满的。
验证¶
修改一下前面的代码来验证一下上面所说的原因:
def reader(q):
while True:
try:
q.get(timeout=5)
print('read')
except Exception as e:
print('reader error: {!r}, {}'.format(e, q._rlock)) # 增加显式 _rlock 信息
time.sleep(1)
验证:
$ python reader_deadlock_rlock.py
reader pid: 464
write
write
writer error: Full()
...
read
read
reader dead, start new reader
reader pid: 466
write
write
writer error: Full()
...
reader error: Empty(), <Lock(owner=SomeOtherProcess)> # _rlock 一直被其他进程所占用
writer error: Full()
...
reader error: Empty(), <Lock(owner=SomeOtherProcess)> # _rlock 一直被其他进程所占用
writer error: Full()
应对方法¶
写入方意外退出导致内部异常¶
另一个场景是当写入方意外退出后,极端情况下会出现内部异常导致读取方无法读取数据、 同时新加入的写入方也无法写入数据的问题。
复现问题¶
因为是极端情况下会出现的问题,暂时没找到 100% 复现问题的方法,只能从代码实现层面分析。
原因¶
先来看一下 Queue 的 内部实现逻辑 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | class Queue(object): def __init__(self, maxsize=0, *, ctx): # ... self._maxsize = maxsize self._reader, self._writer = connection.Pipe(duplex=False) self._rlock = ctx.Lock() self._opid = os.getpid() if sys.platform == 'win32': self._wlock = None else: self._wlock = ctx.Lock() # 使用 BoundedSemaphore 控制队列大小 self._sem = ctx.BoundedSemaphore(maxsize) self._after_fork() if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) def _after_fork(self): debug('Queue._after_fork()') self._notempty = threading.Condition(threading.Lock()) self._buffer = collections.deque() self._thread = None self._jointhread = None self._joincancelled = False self._closed = False self._close = None self._send_bytes = self._writer.send_bytes self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") if not self._sem.acquire(block, timeout): raise Full # acquire 后 _sem 中的 count - 1,表示当前 queue 可用大小减一 with self._notempty: if self._thread is None: self._start_thread() # 数据放到 buffer 里,通知后台线程去读取 buffer 里的数据 self._buffer.append(obj) self._notempty.notify() def get(self, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: # 从 Pipe 中读取数据 res = self._recv_bytes() # release 后 _sem 中的 count + 1,当前 queue 可用大小加一 self._sem.release() else: if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout): raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout): raise Empty elif not self._poll(): raise Empty # 从 Pipe 中读取数据 res = self._recv_bytes() # release 后 _sem 中的 count + 1,当前 queue 可用大小加一 self._sem.release() finally: self._rlock.release() # unserialize the data after having released the lock return _ForkingPickler.loads(res) def empty(self): return not self._poll() def full(self): return self._sem._semlock._is_zero() def get_nowait(self): return self.get(False) def put_nowait(self, obj): return self.put(obj, False) def _start_thread(self): debug('Queue._start_thread()') # Start thread which transfers data from buffer to pipe self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe, self._on_queue_feeder_error, self._sem), name='QueueFeederThread' ) self._thread.daemon = True debug('doing self._thread.start()') self._thread.start() debug('... done self._thread.start()') # ... @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release nwait = notempty.wait bpopleft = buffer.popleft sentinel = _sentinel if sys.platform != 'win32': wacquire = writelock.acquire wrelease = writelock.release else: wacquire = None # 在这个后台线程中把 buffer 里的数据写入到 Pipe while 1: try: nacquire() try: if not buffer: nwait() finally: nrelease() try: while 1: obj = bpopleft() if obj is sentinel: debug('feeder thread got sentinel -- exiting') close() return # serialize the data before acquiring the lock obj = _ForkingPickler.dumps(obj) if wacquire is None: send_bytes(obj) else: wacquire() try: send_bytes(obj) finally: wrelease() except IndexError: pass except Exception as e: if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has # started to cleanup. if is_exiting(): info('error in queue thread: %s', e) return else: # Since the object has not been sent in the queue, we need # to decrease the size of the queue. The error acts as # if the object had been silently removed from the queue # and this step is necessary to have a properly working # queue. queue_sem.release() onerror(e, obj) |
从上面的代码可以得知,put 和 get 方法的与这个问题有关的关键流程如下:
- 队列大小是通过 _sem 这个 BoundedSemaphore(maxsize) 实例控制的。
- get 和 put 之间的数据是通过 pipe 传递的。
put(obj) 操作:
- 执行 _sem.acquire() ,如果获取不到 semaphore 的话,表示当前队列已满。
- 获取到 semaphore 后(内部计数器加一,此时队列可用大小减一),把数据放到 buffer 中。
- 后台有个线程会去从 buffer 中读取数据,然后写入到 pipe 中。
get() 操作:
- 从 pipe 中读取数据。
- 读到数据后,执行 _sem.release() ,semaphore 内部计数器减一,此时队列可用大小加一。
问题出现的场景就是,上面 put(object) 中的后台线程 从 buffer 中读取数据然后写入 pipe 的过程中进程意外退出了, 此时因为数据没有写入到 pipe 中, get() 操作不会执行第2步的 _sem.release() 操作, 导致 put(object) 中 acquire 的 semaphore 永远不会被释放,也就是说当前队列可用大小会比实际大小小1。 如果这个进程意外退出的场景多出现几次,最终的结果就是有很多个 semaphore 永远不会被释放,极端情况下出现队列可用大小为 0 的情况,但是实际上队列中并没有数据,出现既写不进去数据又读取不了数据的情况。
Comments