問題描述
我有一個(gè) 256x256x256
Numpy 數(shù)組,其中每個(gè)元素都是一個(gè)矩陣.我需要對(duì)這些矩陣中的每一個(gè)進(jìn)行一些計(jì)算,并且我想使用 multiprocessing
模塊來加快速度.
I have a 256x256x256
Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing
module to speed things up.
這些計(jì)算的結(jié)果必須像原來的一樣存儲(chǔ)在一個(gè)256x256x256
數(shù)組中,這樣矩陣在元素[i,j,k]
處的結(jié)果原數(shù)組中的元素必須放在新數(shù)組的[i,j,k]
元素中.
The results of these calculations must be stored in a 256x256x256
array like the original one, so that the result of the matrix at element [i,j,k]
in the original array must be put in the [i,j,k]
element of the new array.
為此,我想創(chuàng)建一個(gè)列表,可以用偽方式編寫為 [array[i,j,k], (i, j, k)]
和將其傳遞給要多處理"的函數(shù).假設(shè) matrices
是從原始數(shù)組中提取的所有矩陣的列表,而 myfunc
是進(jìn)行計(jì)算的函數(shù),代碼看起來有點(diǎn)像這樣:
To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)]
and pass it to a function to be "multiprocessed".
Assuming that matrices
is a list of all the matrices extracted from the original array and myfunc
is the function doing the calculations, the code would look somewhat like this:
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
然而,似乎 map_async
實(shí)際上首先創(chuàng)建了這個(gè)巨大的 finput
-list:我的 CPU 沒有做太多,但內(nèi)存和交換完全被消耗幾秒鐘的事,這顯然不是我想要的.
However, it seems like map_async
is actually creating this huge finput
-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.
有沒有辦法將這個(gè)巨大的列表傳遞給一個(gè)多處理函數(shù)而無需先顯式創(chuàng)建它?或者你知道解決這個(gè)問題的另一種方法嗎?
Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first? Or do you know another way of solving this problem?
非常感謝!:-)
推薦答案
所有 multiprocessing.Pool.map*
方法完全使用迭代器(demo code) 只要函數(shù)叫.要一次給迭代器的 map 函數(shù)塊提供一個(gè)塊,請(qǐng)使用 grouper_nofill
:
All multiprocessing.Pool.map*
methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill
:
def grouper_nofill(n, iterable):
'''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
'''
it=iter(iterable)
def take():
while 1: yield list(itertools.islice(it,n))
return iter(take().next,[])
chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)
PS.pool.map_async
的 chunksize
參數(shù)做了一些不同的事情:它將可迭代對(duì)象分成塊,然后將每個(gè)塊交給一個(gè)調(diào)用 map(func,chunk)代碼>.如果
func(item)
完成得太快,這可以為工作進(jìn)程提供更多數(shù)據(jù)來咀嚼,但它對(duì)您的情況沒有幫助,因?yàn)榈魅匀辉?map_async<之后立即被完全消耗/code> 調(diào)用已發(fā)出.
PS. pool.map_async
's chunksize
parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk)
. This can give the worker process more data to chew on if func(item)
finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async
call is issued.
這篇關(guān)于結(jié)合 itertools 和多處理?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!