問題描述
我正在嘗試處理一個文件(每一行都是一個 json 文檔).文件的大小可以達到 100 mbs 到 gb 的.所以我寫了一個生成器代碼來逐行從文件中獲取每個文檔.
I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.
def jl_file_iterator(file):
with codecs.open(file, 'r', 'utf-8') as f:
for line in f:
document = json.loads(line)
yield document
我的系統有 4 個核心,所以我想并行處理 4 行文件.目前我有這段代碼,一次需要 4 行,并調用代碼進行并行處理
My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing
threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
files.append(jl)
if i % (threads) == 0:
# pool.map(processFile, files)
parallelProcess(files, o)
files = []
i += 1
if files:
parallelProcess(files, o)
files = []
這是我進行實際處理的代碼
This is my code where actual processing happens
def parallelProcess(files, outfile):
processes = []
for i in range(len(files)):
p = Process(target=processFile, args=(files[i],))
processes.append(p)
p.start()
for i in range(len(files)):
processes[i].join()
def processFile(doc):
extractors = {}
... do some processing on doc
o.write(json.dumps(doc) + '
')
如您所見,我等待所有 4 行完成處理,然后再發送接下來的 4 個文件進行處理.但是我想做的是,一旦一個進程完成處理文件,我就想開始下一行以分配給已發布的處理器.我怎么做?
As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?
PS:問題是因為它是一個生成器,所以我無法加載所有文件并使用 map 之類的東西來運行進程.
PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.
感謝您的幫助
推薦答案
正如@pvg 在評論中所說,(有界)隊列是在不同速度的生產者和消費者之間進行調解的自然方式,確保他們都保持不變盡可能忙,但不讓制作人領先.
As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.
這是一個獨立的可執行示例.隊列被限制為等于工作進程數的最大大小.如果消費者的運行速度比生產者快得多,那么讓隊列變得更大是很有意義的.
Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.
在您的特定情況下,將行傳遞給消費者并讓他們并行執行 document = json.loads(line)
部分可能是有意義的.
In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line)
part in parallel.
import multiprocessing as mp
NCORE = 4
def process(q, iolock):
from time import sleep
while True:
stuff = q.get()
if stuff is None:
break
with iolock:
print("processing", stuff)
sleep(stuff)
if __name__ == '__main__':
q = mp.Queue(maxsize=NCORE)
iolock = mp.Lock()
pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
for stuff in range(20):
q.put(stuff) # blocks until q below its max size
with iolock:
print("queued", stuff)
for _ in range(NCORE): # tell workers we're done
q.put(None)
pool.close()
pool.join()
這篇關于帶有生成器的 Python 多處理的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!