問題描述
在多處理模塊中使用 Pool.map_async()
(以及 Pool.map()
)時遇到問題.我已經實現了一個并行循環函數,只要 Pool.map_async
的函數輸入是常規"函數,它就可以正常工作.功能.當功能是例如一個類的方法,然后我得到一個 PicklingError
:
I am having trouble when using the Pool.map_async()
(and also Pool.map()
) in the multiprocessing module. I have implemented a parallel-for-loop function that works fine as long as the function input to Pool.map_async
is a "regular" function. When the function is e.g. a method to a class, then I get a PicklingError
:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我只使用 Python 進行科學計算,所以我對酸洗的概念不是很熟悉,今天剛剛了解了一點.我看過幾個以前的答案,比如 Can't pickle <type 'instancemethod'>使用多處理 Pool.map() 時,但我無法弄清楚如何使其工作,即使按照答案中提供的鏈接進行操作也是如此.
I use Python only for scientific computing so I am not so familiar with the concept of pickling, have just learned a bit about it today. I have looked at a couple of previous answers, like Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(), but I cannot figure out how to make it work, even when following the link provided in the answer.
我的代碼,其目標是使用多核模擬 Normal r.v 的向量.請注意,這只是一個示例,甚至可能無法在多核上運行.
My code, where the objective is to simulate a vector of Normal r.v's with the use of multiple cores. Note that this is just an example and maybe it does not even payoff to run on multiple cores.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
按照 不能腌制<輸入'instancemethod'>在使用多處理 Pool.map() 時,Steven Bethard(幾乎在最后)建議使用 copy_reg
模塊.他的代碼是:
Following the link provided in the answer to the question in Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(), Steven Bethard (almost at the end) suggests using the copy_reg
module. His code is:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
我真的不明白如何使用它.我唯一能想到的就是把它放在我的代碼之前,但它沒有幫助.一個簡單的解決方案當然是只使用可行的解決方案,避免涉及 copy_reg
.我更感興趣的是讓 copy_reg
正常工作以充分利用多處理,而不必每次都解決問題.
I don't really understand how I can make use of this. The only thing I could come up with was putting it just before my code but it did not help. A simple solution is of course to just go with the one that works and avoid getting involved with copy_reg
. I am more interested in getting copy_reg
to work properly to take fully advantage of multiprocessing without having to go around the problem each time.
推薦答案
這里的問題與其說是pickle"錯誤信息,不如說是概念性的:多進程確實在工人"不同的進程中分叉你的代碼以執行它的魔力.
The problem here is less of the "pickle" error message than conceptual: multiprocess does fork your code in "worker" different processes in order to perform its magic.
然后,它通過無縫序列化和反序列化數據(即使用 pickle 的部分)將數據發送到不同的進程或從不同的進程發送數據.
It then sends data to and from the different process by seamlessly serializing and de-serializing the data (that is the part that uses the pickle).
當來回傳遞的部分數據是一個函數時 - 它假設被調用進程中存在同名的函數,并且(我猜)將函數名作為字符串傳遞.由于函數是無狀態的,被調用的工作進程只是用它收到的數據調用同一個函數.(Python函數不能通過pickle進行序列化,所以只在主進程和工作進程之間傳遞引用)
When part of the data passed back and forth is a function - it assumes a function with the same name exists in the callee process, and (I guess) passes the function name, as a string. Since functions are stateless, the called worker-process just calls that same function with the data it has received. (Python functions can't be serialized through pickle, so just the reference is passed between the master and the worker processes)
當你的函數是實例中的一個方法時——盡管當我們編寫 python 代碼時,它與函數很像,帶有一個自動"self
變量,但在下面就不一樣了.因為實例(對象)是有狀態的.這意味著工作進程沒有對象的副本,該對象是您要在另一端調用的方法的所有者.
When your function is a method in an instance - although when we code python it is much like the same thing as a function, with an "automatic" self
variable, it is not the same underneath. Because instances (objects) are stateful. That means the worker process does not have a copy of the object that is the owner of the method you want to call on the other side.
解決將方法作為函數傳遞給 map_async 調用的方法也不起作用 - 因為多進程只使用函數引用,而不是傳遞它時的實際函數.
Working around ways of passing your method as a function to the map_async call won't work either - as multiprocess just uses a function reference, not the actual function when passing it around.
因此,您應該 (1) 更改您的代碼,以便將函數(而不是方法)傳遞給工作進程,將對象保持的任何狀態轉換為要調用的新參數.(2) 為 map_async 調用創建一個目標"函數,該函數在工作進程端重建所需的對象,然后調用其中的函數.Python 中最直接的類本身是可挑選的,因此您可以在 map_async 調用中傳遞作為函數所有者本身的對象 - 目標"函數將在工作端調用適當的方法本身.
So, you should (1) either change your code so that you do pass a function - and not a method - to the worker processes, converting whatever states the object keeps to new parameters to be called. (2) Create a "target" function for the map_async call that reconstructs the needed object on the worker-process side, and then calls the function inside it. Most straightforward classes in Python are pickable themselves, so you could pass the object that is the function owner itself on the map_async call - and the "target" function would call the appropriate method itself on the worker side.
(2) 可能聽起來困難",但可能就是這樣——除非你的對象的類不能被腌制:
(2) may sound "difficult" but it is probably just something like this - unless your object's class can't be pickled:
import types
def target(object, *args, **kw):
method_name = args[0]
return getattr(object, method_name)(*args[1:])
(...)
#And add these 3 lines prior to your map_async call:
# Evaluate function
if isinstance (func, types.MethodType):
arguments.insert(0, func.__name__)
func = target
result = pool.map_async(func, arguments, chunksize = chunksize)
*免責聲明:我沒有測試過這個
*disclaimer: I haven't tested this
這篇關于使用多處理時出現 PicklingError的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!