问题描述
我希望将 multiprocessing.queue 转储到列表中.对于该任务,我编写了以下函数:
i wish to dump a multiprocessing.queue into a list. for that task i've written the following function:
import queue
def dump_queue(queue):
"""
empties all pending items in a queue and returns them in a list.
"""
result = []
# start debug code
initial_size = queue.qsize()
print("queue has %s items initially." % initial_size)
# end debug code
while true:
try:
thing = queue.get(block=false)
result.append(thing)
except queue.empty:
# start debug code
current_size = queue.qsize()
total_size = current_size len(result)
print("dumping complete:")
if current_size == initial_size:
print("no items were added to the queue.")
else:
print("%s items were added to the queue." %
(total_size - initial_size))
print("extracted %s items from the queue, queue has %s items
left" % (len(result), current_size))
# end debug code
return result
但由于某种原因它不起作用.
but for some reason it doesn't work.
观察以下 shell 会话:
observe the following shell session:
>>> import multiprocessing >>> q = multiprocessing.queue() >>> for i in range(100): ... q.put([range(200) for j in range(100)]) ... >>> q.qsize() 100 >>> l=dump_queue(q) queue has 100 items initially. dumping complete: 0 items were added to the queue. extracted 1 items from the queue, queue has 99 items left >>> l=dump_queue(q) queue has 99 items initially. dumping complete: 0 items were added to the queue. extracted 3 items from the queue, queue has 96 items left >>> l=dump_queue(q) queue has 96 items initially. dumping complete: 0 items were added to the queue. extracted 1 items from the queue, queue has 95 items left >>>
这里发生了什么?为什么不是所有的物品都被倾倒了?
what's happening here? why aren't all the items being dumped?
推荐答案
试试这个:
import queue
import time
def dump_queue(queue):
"""
empties all pending items in a queue and returns them in a list.
"""
result = []
for i in iter(queue.get, 'stop'):
result.append(i)
time.sleep(.1)
return result
import multiprocessing
q = multiprocessing.queue()
for i in range(100):
q.put([range(200) for j in range(100)])
q.put('stop')
l=dump_queue(q)
print len(l)
多处理队列有一个内部缓冲区,该缓冲区有一个馈线线程,该线程从缓冲区中提取工作并将其刷新到管道中.如果不是所有的对象都被刷新,我可以看到 empty 过早引发的情况.使用哨兵来指示队列的结束是安全的(可靠的).此外,使用 iter(get, sentinel) 习惯用法比依赖 empty 更好.
multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. if not all of the objects have been flushed, i could see a case where empty is raised prematurely. using a sentinel to indicate the end of the queue is safe (and reliable). also, using the iter(get, sentinel) idiom is just better than relying on empty.
我不喜欢它可能由于刷新时间而升空(我添加了 time.sleep(.1) 以允许上下文切换到馈线线程,您可能不需要它,没有它它也可以工作 - 它是释放 gil 的习惯).
i don't like that it could raise empty due to flushing timing (i added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the gil).
大驴的