問題描述
我正在構建一個 python 模塊來從大量文本中提取標簽,雖然它的結果質量很高,但它的執行速度非常慢.我試圖通過使用多處理來加速這個過程,這也很有效,直到我嘗試引入一個鎖,以便一次只有一個進程連接到我們的數據庫.我一生都無法弄清楚如何完成這項工作-盡管進行了很多搜索和調整,但我仍然收到 PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock 失敗
.這是有問題的代碼 - 在我嘗試將鎖定對象作為 f
的參數傳遞之前它運行良好.
I'm building a python module to extract tags from a large corpus of text, and while its results are high quality it executes very slowly. I'm trying to speed the process up by using multiprocessing, and that was working too, until I tried to introduce a lock so that only one process was connecting to our database at a time. I can't figure out for the life of me how to make this work - despite much searching and tweaking I am still getting a PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
. Here's the offending code - it worked fine until I tried to pass a lock object as an argument for f
.
def make_network(initial_tag, max_tags = 2, max_iter = 3):
manager = Manager()
lock = manager.Lock()
pool = manager.Pool(8)
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a list
# of strings (tags) as its sole argument, and returns a list of sets with entries
# corresponding to the input list.
f = partial(get_more_tags, max_tags = max_tags, lock = lock)
def _recursively_find_more_tags(tags, level):
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print i + "|" + joined
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level+1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
推薦答案
你的問題是鎖對象不可picklable.在這種情況下,我可以為您看到兩種可能的解決方案.
Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.
為避免這種情況,您可以將鎖變量設為全局變量.然后,您將能夠在池進程函數中直接將其作為全局變量引用,而不必將其作為參數傳遞給池進程函數.這是因為 Python 在創建池進程時使用
OS fork
機制,因此將創建池進程的進程的全部內容復制到它們.這是將鎖傳遞給使用 multiprocessing 包創建的 Python 進程的唯一方法.順便說一句,沒有必要只為這個鎖使用Manager
類.通過此更改,您的代碼將如下所示:
To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the
OS fork
mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use theManager
class just for this lock. With this change your code would look like this:
import multiprocessing
from functools import partial
lock = None # Global definition of lock
pool = None # Global definition of pool
def make_network(initial_tag, max_tags=2, max_iter=3):
global lock
global pool
lock = multiprocessing.Lock()
pool = multiprocessing.Pool(8)
def get_more_tags():
global lock
pass
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags)
def _recursively_find_more_tags(tags, level):
global pool
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print(i + "|" + joined)
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level + 1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
在您的真實代碼中,鎖和池變量可能是類實例變量.
In your real code, it is possible that the lock and pool variables might be class instance variables.
- 完全避免使用鎖但開銷可能稍高的第二種解決方案是使用
multiprocessing.Process
創建另一個進程并通過multiprocessing.Queue代碼> 到您的每個池進程.此過程將負責運行您的數據庫查詢.您將使用隊列來允許池進程將參數發送到管理數據庫查詢的進程.由于所有池進程將使用相同的隊列,因此對數據庫的訪問將自動序列化.額外的開銷將來自數據庫查詢參數和查詢響應的酸洗/解酸.請注意,您可以將
multiprocessing.Queue
對象作為參數傳遞給池進程.另請注意,基于multiprocessing.Lock
的解決方案不適用于未使用fork
語義創建進程的Windows
.
- A second solution which avoids the use of locks altogether but which might have slightly higher overhead would be to create another process with
multiprocessing.Process
and connect it via amultiprocessing.Queue
to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass amultiprocessing.Queue
object to a pool process as an argument. Note also that themultiprocessing.Lock
based solution would not work onWindows
where process are not created withfork
semantics.
這篇關于使用帶有 multiprocessing.Pool 的鎖時遇到問題:酸洗錯誤的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!