問題描述
我已經(jīng)擺弄 Python 的 multiprocessing
功能一個(gè)多小時(shí)了,嘗試使用 multiprocessing.Process
和 multiprocessing 并行化一個(gè)相當(dāng)復(fù)雜的圖形遍歷函數(shù).經(jīng)理
:
I have been fiddling with Python's multiprocessing
functionality for upwards of an hour now, trying to parallelize a rather complex graph traversal function using multiprocessing.Process
and multiprocessing.Manager
:
import networkx as nx
import csv
import time
from operator import itemgetter
import os
import multiprocessing as mp
cutoff = 1
exclusionlist = ["cpd:C00024"]
DG = nx.read_gml("KeggComplete.gml", relabel=True)
for exclusion in exclusionlist:
DG.remove_node(exclusion)
# checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__),
'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
os.makedirs(fn)
manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),
key=itemgetter(1),
reverse=True)
def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
source = item[0]
uniqueTreePaths = []
if cutoff < 1:
return
visited = [source]
stack = [iter(DG[source])]
while stack:
children = stack[-1]
child = next(children, None)
if child is None:
stack.pop()
visited.pop()
elif child in memorizedPaths:
for path in memorizedPaths[child]:
newPath = (tuple(visited) + tuple(path))
if (len(newPath) <= cutoff) and
(len(set(visited) & set(path)) == 0):
uniqueTreePaths.append(newPath)
continue
elif len(visited) < cutoff:
if child not in visited:
visited.append(child)
stack.append(iter(DG[child]))
if visited not in uniqueTreePaths:
uniqueTreePaths.append(tuple(visited))
else: # len(visited) == cutoff:
if (visited not in uniqueTreePaths) and
(child not in visited):
uniqueTreePaths.append(tuple(visited + [child]))
stack.pop()
visited.pop()
# writes the absolute path of the node path file into the hash table
filepaths[source] = str(fn) + "/" + str(source) + "path.txt"
with open (filepaths[source], "wb") as csvfile2:
writer = csv.writer(csvfile2, delimiter=" ", quotechar="|")
for path in uniqueTreePaths:
writer.writerow(path)
memorizedPaths[source] = uniqueTreePaths
############################################################################
if __name__ == '__main__':
start = time.clock()
for item in degreelist:
test = mp.Process(target=_all_simple_paths_graph,
args=(DG, cutoff, item, memorizedPaths, filepaths))
test.start()
test.join()
end = time.clock()
print (end-start)
目前 - 盡管運(yùn)氣和魔法 - 它有效(有點(diǎn)).我的問題是我只使用了 24 個(gè)內(nèi)核中的 12 個(gè).
Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.
有人可以解釋為什么會(huì)這樣嗎?也許我的代碼不是最好的多處理解決方案,或者它是我架構(gòu)的一個(gè)特性Intel Xeon CPU E5-2640 @ 2.50GHz x18 在 Ubuntu 13.04 x64 上運(yùn)行?
Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64?
我設(shè)法得到:
p = mp.Pool()
for item in degreelist:
p.apply_async(_all_simple_paths_graph,
args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()
工作,但是,它非常慢!所以我假設(shè)我在工作中使用了錯(cuò)誤的功能.希望它有助于澄清我想要完成的事情!
Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!
.map
嘗試:
partialfunc = partial(_all_simple_paths_graph,
DG=DG,
cutoff=cutoff,
memorizedPaths=memorizedPaths,
filepaths=filepaths)
p = mp.Pool()
for item in processList:
processVar = p.map(partialfunc, xrange(len(processList)))
p.close()
p.join()
工作,比單核慢.是時(shí)候優(yōu)化了!
Works, is slower than singlecore. Time to optimize!
推薦答案
這里堆積太多,無法在注釋中解決,所以,mp
是 multiprocessing
的地方:
Too much piling up here to address in comments, so, where mp
is multiprocessing
:
mp.cpu_count()
應(yīng)該返回處理器的數(shù)量.但是測(cè)試一下.有些平臺(tái)很時(shí)髦,而且這些信息并不總是很容易獲得.Python 盡其所能.
mp.cpu_count()
should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.
如果您啟動(dòng) 24 個(gè)進(jìn)程,它們將完全按照您的指示執(zhí)行 ;-) 看起來 mp.Pool()
對(duì)您來說最方便.您將要?jiǎng)?chuàng)建的進(jìn)程數(shù)傳遞給其構(gòu)造函數(shù).mp.Pool(processes=None)
將使用 mp.cpu_count()
作為處理器數(shù)量.
If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool()
would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None)
will use mp.cpu_count()
for the number of processors.
然后,您可以在您的 Pool
實(shí)例上使用例如 .imap_unordered(...)
來跨進(jìn)程傳播您的 degreelist
.或者,也許其他一些 Pool
方法更適合您 - 實(shí)驗(yàn).
Then you can use, for example, .imap_unordered(...)
on your Pool
instance to spread your degreelist
across processes. Or maybe some other Pool
method would work better for you - experiment.
如果你不能將問題放到 Pool
的世界觀中,你可以改為創(chuàng)建一個(gè) mp.Queue
來創(chuàng)建一個(gè)工作隊(duì)列,.put()
'ing 節(jié)點(diǎn)(或節(jié)點(diǎn)切片,以減少開銷)在主程序中工作,并將工作人員寫入 .get()
工作項(xiàng)隊(duì)列.詢問您是否需要示例.請(qǐng)注意,您需要在所有真實(shí)"工作項(xiàng)之后將標(biāo)記值(每個(gè)進(jìn)程一個(gè))放在隊(duì)列中,以便工作進(jìn)程可以測(cè)試標(biāo)記以了解它們何時(shí)完成.
If you can't bash the problem into Pool
's view of the world, you could instead create an mp.Queue
to create a work queue, .put()
'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get()
work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.
僅供參考,我喜歡隊(duì)列,因?yàn)樗鼈兏鞔_.許多其他人更喜歡 Pool
,因?yàn)樗鼈兏衿?;-)
FYI, I like queues because they're more explicit. Many others like Pool
s better because they're more magical ;-)
這是一個(gè)可執(zhí)行的原型.這顯示了一種將 imap_unordered
與 Pool
和 chunksize
一起使用的方法,不需要更改任何函數(shù)簽名.當(dāng)然,您必須插入您的真實(shí)代碼;-) 請(qǐng)注意,init_worker
方法允許每個(gè)處理器僅傳遞大部分"參數(shù)一次,而不是為 中的每個(gè)項(xiàng)目傳遞一次度列表
.減少進(jìn)程間通信量對(duì)于提高速度至關(guān)重要.
Here's an executable prototype for you. This shows one way to use imap_unordered
with Pool
and chunksize
that doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_worker
approach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist
. Cutting the amount of inter-process communication can be crucial for speed.
import multiprocessing as mp
def init_worker(mps, fps, cut):
global memorizedPaths, filepaths, cutoff
global DG
print "process initializing", mp.current_process()
memorizedPaths, filepaths, cutoff = mps, fps, cut
DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)
def work(item):
_all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)
def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
pass # print "doing " + str(item)
if __name__ == "__main__":
m = mp.Manager()
memorizedPaths = m.dict()
filepaths = m.dict()
cutoff = 1 ##
# use all available CPUs
p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
filepaths,
cutoff))
degreelist = range(100000) ##
for _ in p.imap_unordered(work, degreelist, chunksize=500):
pass
p.close()
p.join()
我強(qiáng)烈建議完全按原樣運(yùn)行它,這樣您就可以看到它的速度非常快.然后稍微添加一些東西,看看它是如何影響時(shí)間的.例如,只需添加
I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding
memorizedPaths[item] = item
to _all_simple_paths_graph()
大大減慢了它的速度.為什么?因?yàn)樽值潆S著每次添加而變得越來越大,并且這個(gè)進(jìn)程安全的字典必須在所有進(jìn)程之間同步(在幕后).同步的單位是整個(gè) dict"——mp 機(jī)器無法利用內(nèi)部結(jié)構(gòu)對(duì)共享 dict 進(jìn)行增量更新.
to _all_simple_paths_graph()
slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.
如果您負(fù)擔(dān)不起這筆費(fèi)用,那么您不能為此使用 Manager.dict()
.聰明的機(jī)會(huì)比比皆是;-)
If you can't afford this expense, then you can't use a Manager.dict()
for this. Opportunities for cleverness abound ;-)
這篇關(guān)于如何通過 python 多處理利用所有內(nèi)核的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!