問題描述
我在 python 中遇到以下問題.
I'm having the following problem in python.
我需要并行進行一些計算,其結果需要按順序寫入文件中.所以我創建了一個接收 multiprocessing.Queue
和文件句柄的函數,進行計算并將結果打印到文件中:
I need to do some calculations in parallel whose results I need to be written sequentially in a file. So I created a function that receives a multiprocessing.Queue
and a file handle, do the calculation and print the result in the file:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
但腳本運行后文件最終為空.我試圖將 worker() 函數更改為:
But the file ends up empty after the script runs. I tried to change the worker() function to:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
并將文件名作為參數傳遞.然后它按我的意圖工作.當我嘗試按順序執行相同的操作時,沒有多處理,它也可以正常工作.
and pass the filename as parameter. Then it works as I intended. When I try to do the same thing sequentially, without multiprocessing, it also works normally.
為什么它在第一個版本中不起作用?我看不出問題.
Why it didn't worked in the first version? I can't see the problem.
另外:我可以保證兩個進程不會同時嘗試寫入文件嗎?
Also: can I guarantee that two processes won't try to write the file simultaneously?
謝謝.我現在明白了.這是工作版本:
Thanks. I got it now. This is the working version:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join ()
for p in calcProc:
p.join()
writProc.join ()
推薦答案
你真的應該使用兩個隊列和三種不同的處理方式.
You really should use two queues and three separate kinds of processing.
將東西放入隊列 #1.
Put stuff into Queue #1.
從 Queue #1 中取出東西并進行計算,然后將東西放入 Queue #2.您可以擁有其中的許多,因為它們從一個隊列中取出并安全地放入另一個隊列.
Get stuff out of Queue #1 and do calculations, putting stuff in Queue #2. You can have many of these, since they get from one queue and put into another queue safely.
從 Queue #2 中取出內容并將其寫入文件.您必須恰好擁有其中的 1 個,僅此而已.它擁有"文件,保證原子訪問,并絕對保證文件被干凈和一致地寫入.
Get stuff out of Queue #2 and write it to a file. You must have exactly 1 of these and no more. It "owns" the file, guarantees atomic access, and absolutely assures that the file is written cleanly and consistently.
這篇關于使用多處理寫入文件的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!