問題描述
我正在嘗試使用進程對象在 python 中使用工作池.每個工人(一個進程)進行一些初始化(花費大量時間),傳遞一系列作業(理想情況下使用 map()
),并返回一些東西.除此之外,不需要任何溝通.但是,我似乎無法弄清楚如何使用 map() 來使用我的工人的 compute()
函數.
I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()
), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute()
function.
from multiprocessing import Pool, Process
class Worker(Process):
def __init__(self):
print 'Worker started'
# do some initialization here
super(Worker, self).__init__()
def compute(self, data):
print 'Computing things!'
return data * data
if __name__ == '__main__':
# This works fine
worker = Worker()
print worker.compute(3)
# workers get initialized fine
pool = Pool(processes = 4,
initializer = Worker)
data = range(10)
# How to use my worker pool?
result = pool.map(compute, data)
是作業隊列代替,還是我可以使用 map()
?
Is a job queue the way to go instead, or can I use map()
?
推薦答案
我建議你為此使用隊列.
I would suggest that you use a Queue for this.
class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
# Use data
現在您可以開始一堆這些,所有這些都從一個隊列中獲取工作
Now you can start a pile of these, all getting work from a single queue
request_queue = Queue()
for i in range(4):
Worker(request_queue).start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(None)
這樣的事情應該可以讓您將昂貴的啟動成本分攤給多個工人.
That kind of thing should allow you to amortize the expensive startup cost across multiple workers.
這篇關于帶有工作進程的 python 池的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!