Python: multiprocessing.Queue 在特定场景下会出现内部锁异常

前言

如题所述,multiprocessing.Queue 在特定场景下会出现内部锁异常, 导致 Queue 实例出现既无法写入数据也无法读取数据的情况。

读取方意外退出导致内部异常

一个场景是当读取方意外退出后,即便后来又有了新的读取方, 还是会出现内部异常最终导致写入方无法写入的问题。

复现问题

可以通过下面的代码复现这个问题:

import multiprocessing
import time


def reader(q):
    while True:
        try:
            q.get(timeout=5)
            print('read')
        except Exception as e:
            print('reader error: {!r}'.format(e))
            time.sleep(1)


def writer(q):
    while True:
        try:
            q.put_nowait(time.time())
            print('write')
        except Exception as e:
            print('writer error: {!r}'.format(e))
            time.sleep(1)


q = multiprocessing.Queue(2)

w = multiprocessing.Process(target=writer, args=(q,))
w.start()

r = multiprocessing.Process(target=reader, args=(q,))
r.start()
print('reader pid: {}'.format(r.pid))

while True:
    if not r.is_alive():
        print('reader dead, start new reader')
        r = multiprocessing.Process(target=reader, args=(q,))
        r.start()
        print('reader pid: {}'.format(r.pid))
    time.sleep(1)

运行这段代码,在代码运行过程中 kill 掉 reader 进程模拟读取方意外退出的情况:

$ python3.8 reader_deadlock.py
reader pid: 53
write
write
writer error: Full()
read
read
write
write
writer error: Full()
read
read
...
read
read
reader dead, start new reader   # <-- kill 53
reader pid: 55
write
write
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
reader error: Empty()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
writer error: Full()
reader error: Empty()

可以看到,在杀掉 reader 进程后,即便后来再重新启动了一个 reader 进程, 还是出现了 writer 写不进去、reader 读不出来的问题。

原因

为什么会出现这个问题呢?看一下 Queue 内部 getput 方法的 关键代码 :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 class Queue(object):

     def __init__(self, maxsize=0, *, ctx):
         # ...
         self._rlock = ctx.Lock()  # ctx.Lock:  class Lock(SemLock)
         # ...
         self._sem = ctx.BoundedSemaphore(maxsize)
         # ...

     def get(self, block=True, timeout=None):
         # ...
         if block and timeout is None:
             with self._rlock:
                 res = self._recv_bytes()
             self._sem.release()  # <- 释放 semaphore
         else:
             if block:
                 deadline = time.monotonic() + timeout
             if not self._rlock.acquire(block, timeout):  # <- 获取锁
                 raise Empty
             try:
                 if block:
                     timeout = deadline - time.monotonic()
                     if not self._poll(timeout):  # <- 阻塞直到有数据或超时
                         raise Empty
                 elif not self._poll():
                     raise Empty
                 res = self._recv_bytes()
                 self._sem.release()    # <- 释放 semaphore
             finally:
                 self._rlock.release()   # <- 释放获取的锁
         # unserialize the data after having released the lock
         return _ForkingPickler.loads(res)

     def put_nowait(self, obj):
         return self.put(obj, False)

为什么 kill 掉 reader 进程后就会出现前面的问题呢, 因为当 reader 进程阻塞在 get(timeout=5) 调用时,实际是阻塞在上面的 24 行代码那里, 此时 reader 进程已经通过 19 行的代码获取到了 self._rlock 这个锁, 当 kill reader 进程时,reader 进程退出并且退出的时候不会执行 31 行的释放锁操作。 当重新启动一个 reader 进程后,新 reader 进程执行 get(timeout=5) 调用时 会因为 19 行获取不到 self._rlock 这个锁导致总是抛出 Empty 异常,因为这个锁在前面那个 reader 进程里被 acquire 了,但是没有被 release 也永远都不会被 release 了,导致内部的 self._rlock 这个锁异常。

新的 reader 进程因为内部的 self._rlock 锁异常导致一直无法读取数据,进而导致 writer 进程无法写入数据,因为队列一直是满的。

验证

修改一下前面的代码来验证一下上面所说的原因:

def reader(q):
    while True:
        try:
            q.get(timeout=5)
            print('read')
        except Exception as e:
            print('reader error: {!r}, {}'.format(e, q._rlock))  # 增加显式 _rlock 信息
            time.sleep(1)

验证:

$ python reader_deadlock_rlock.py
reader pid: 464
write
write
writer error: Full()
...
read
read
reader dead, start new reader
reader pid: 466
write
write
writer error: Full()
...
reader error: Empty(), <Lock(owner=SomeOtherProcess)>  # _rlock 一直被其他进程所占用
writer error: Full()
...
reader error: Empty(), <Lock(owner=SomeOtherProcess)>  # _rlock 一直被其他进程所占用
writer error: Full()

应对方法

写入方意外退出导致内部异常

另一个场景是当写入方意外退出后,极端情况下会出现内部异常导致读取方无法读取数据、 同时新加入的写入方也无法写入数据的问题。

复现问题

因为是极端情况下会出现的问题,暂时没找到 100% 复现问题的方法,只能从代码实现层面分析。

原因

先来看一下 Queue内部实现逻辑 :

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
 class Queue(object):

     def __init__(self, maxsize=0, *, ctx):
         # ...
         self._maxsize = maxsize
         self._reader, self._writer = connection.Pipe(duplex=False)
         self._rlock = ctx.Lock()
         self._opid = os.getpid()
         if sys.platform == 'win32':
             self._wlock = None
         else:
             self._wlock = ctx.Lock()

         # 使用 BoundedSemaphore 控制队列大小
         self._sem = ctx.BoundedSemaphore(maxsize)

         self._after_fork()

         if sys.platform != 'win32':
             register_after_fork(self, Queue._after_fork)

     def _after_fork(self):
         debug('Queue._after_fork()')
         self._notempty = threading.Condition(threading.Lock())
         self._buffer = collections.deque()
         self._thread = None
         self._jointhread = None
         self._joincancelled = False
         self._closed = False
         self._close = None
         self._send_bytes = self._writer.send_bytes
         self._recv_bytes = self._reader.recv_bytes
         self._poll = self._reader.poll

     def put(self, obj, block=True, timeout=None):
         if self._closed:
             raise ValueError(f"Queue {self!r} is closed")
         if not self._sem.acquire(block, timeout):
             raise Full
         # acquire 后 _sem 中的 count - 1,表示当前 queue 可用大小减一

         with self._notempty:
             if self._thread is None:
                 self._start_thread()
             # 数据放到 buffer 里,通知后台线程去读取 buffer 里的数据
             self._buffer.append(obj)
             self._notempty.notify()

     def get(self, block=True, timeout=None):
         if self._closed:
             raise ValueError(f"Queue {self!r} is closed")
         if block and timeout is None:
             with self._rlock:
                 # 从 Pipe 中读取数据
                 res = self._recv_bytes()
             # release 后 _sem 中的 count + 1,当前 queue 可用大小加一
             self._sem.release()
         else:
             if block:
                 deadline = time.monotonic() + timeout
             if not self._rlock.acquire(block, timeout):
                 raise Empty
             try:
                 if block:
                     timeout = deadline - time.monotonic()
                     if not self._poll(timeout):
                         raise Empty
                 elif not self._poll():
                     raise Empty
                 # 从 Pipe 中读取数据
                 res = self._recv_bytes()
                 # release 后 _sem 中的 count + 1,当前 queue 可用大小加一
                 self._sem.release()
             finally:
                 self._rlock.release()
         # unserialize the data after having released the lock
         return _ForkingPickler.loads(res)

     def empty(self):
         return not self._poll()

     def full(self):
         return self._sem._semlock._is_zero()

     def get_nowait(self):
         return self.get(False)

     def put_nowait(self, obj):
         return self.put(obj, False)

     def _start_thread(self):
         debug('Queue._start_thread()')

         # Start thread which transfers data from buffer to pipe
         self._buffer.clear()
         self._thread = threading.Thread(
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send_bytes,
                   self._wlock, self._writer.close, self._ignore_epipe,
                   self._on_queue_feeder_error, self._sem),
             name='QueueFeederThread'
         )
         self._thread.daemon = True

         debug('doing self._thread.start()')
         self._thread.start()
         debug('... done self._thread.start()')
         # ...

     @staticmethod
     def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
               onerror, queue_sem):
         debug('starting thread to feed data to pipe')
         nacquire = notempty.acquire
         nrelease = notempty.release
         nwait = notempty.wait
         bpopleft = buffer.popleft
         sentinel = _sentinel
         if sys.platform != 'win32':
             wacquire = writelock.acquire
             wrelease = writelock.release
         else:
             wacquire = None

         # 在这个后台线程中把 buffer 里的数据写入到 Pipe
         while 1:
             try:
                 nacquire()
                 try:
                     if not buffer:
                         nwait()
                 finally:
                     nrelease()
                 try:
                     while 1:
                         obj = bpopleft()
                         if obj is sentinel:
                             debug('feeder thread got sentinel -- exiting')
                             close()
                             return

                         # serialize the data before acquiring the lock
                         obj = _ForkingPickler.dumps(obj)
                         if wacquire is None:
                             send_bytes(obj)
                         else:
                             wacquire()
                             try:
                                 send_bytes(obj)
                             finally:
                                 wrelease()
                 except IndexError:
                     pass
             except Exception as e:
                 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
                     return
                 # Since this runs in a daemon thread the resources it uses
                 # may be become unusable while the process is cleaning up.
                 # We ignore errors which happen after the process has
                 # started to cleanup.
                 if is_exiting():
                     info('error in queue thread: %s', e)
                     return
                 else:
                     # Since the object has not been sent in the queue, we need
                     # to decrease the size of the queue. The error acts as
                     # if the object had been silently removed from the queue
                     # and this step is necessary to have a properly working
                     # queue.
                     queue_sem.release()
                     onerror(e, obj)

从上面的代码可以得知,put 和 get 方法的与这个问题有关的关键流程如下:

  • 队列大小是通过 _sem 这个 BoundedSemaphore(maxsize) 实例控制的。
  • getput 之间的数据是通过 pipe 传递的。

put(obj) 操作:

  1. 执行 _sem.acquire() ,如果获取不到 semaphore 的话,表示当前队列已满。
  2. 获取到 semaphore 后(内部计数器加一,此时队列可用大小减一),把数据放到 buffer 中。
  3. 后台有个线程会去从 buffer 中读取数据,然后写入到 pipe 中。

get() 操作:

  1. 从 pipe 中读取数据。
  2. 读到数据后,执行 _sem.release() ,semaphore 内部计数器减一,此时队列可用大小加一。

问题出现的场景就是,上面 put(object) 中的后台线程 从 buffer 中读取数据然后写入 pipe 的过程中进程意外退出了, 此时因为数据没有写入到 pipe 中, get() 操作不会执行第2步的 _sem.release() 操作, 导致 put(object) 中 acquire 的 semaphore 永远不会被释放,也就是说当前队列可用大小会比实际大小小1。 如果这个进程意外退出的场景多出现几次,最终的结果就是有很多个 semaphore 永远不会被释放,极端情况下出现队列可用大小为 0 的情况,但是实际上队列中并没有数据,出现既写不进去数据又读取不了数据的情况。

应对方法

总结


Comments