問題描述
我對 python 還很陌生.我正在使用多處理模塊讀取標準輸入上的文本行,以某種方式轉換它們并將它們寫入數據庫.這是我的代碼片段:
I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
現在一切正常,直到我開始處理巨大的輸入文件(數億行),然后通過管道傳輸到我的 python 程序中.在某些時候,當我的數據庫變慢時,我會看到內存已滿.
Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.
玩了一會兒,發現 pool.apply_async 和 pool.map_async 從來沒有阻塞過,所以要處理的調用隊列越來越大.
After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.
解決我的問題的正確方法是什么?我希望我可以設置一個參數,一旦達到某個隊列長度,它將阻止 pool.apply_async 調用.Java 中的 AFAIR 可以為此目的為 ThreadPoolExecutor 提供一個具有固定長度的 BlockingQueue.
What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.
謝謝!
推薦答案
apply_async
和 map_async
函數旨在不阻塞主進程.為了做到這一點,Pool
維護了一個內部 Queue
,遺憾的是它的大小無法更改.
The apply_async
and map_async
functions are designed not to block the main process. In order to do so, the Pool
maintains an internal Queue
which size is unfortunately impossible to change.
解決問題的方法是使用 Semaphore
以您希望隊列的大小進行初始化.在為池提供數據之前以及在工作人員完成任務之后獲取和釋放信號量.
The way the problem can be solved is by using a Semaphore
initialized with the size you want the queue to be. You acquire and release the semaphore before feeding the pool and after a worker has completed the task.
這是一個使用 Python 2.6 或更高版本的示例.
Here's an example working with Python 2.6 or greater.
from threading import Semaphore
from multiprocessing import Pool
def task_wrapper(f):
"""Python2 does not allow a callback for method raising exceptions,
this wrapper ensures the code run into the worker will be exception free.
"""
try:
return f()
except:
return None
class TaskManager(object):
def __init__(self, processes, queue_size):
self.pool = Pool(processes=processes)
self.workers = Semaphore(processes + queue_size)
def new_task(self, f):
"""Start a new task, blocks if queue is full."""
self.workers.acquire()
self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))
def task_done(self):
"""Called once task is done, releases the queue is blocked."""
self.workers.release()
另一個使用 concurrent.futures
池實現的示例.
Another example using concurrent.futures
pools implementation.
這篇關于python pool apply_async 和 map_async 不會阻塞完整隊列的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!