問題描述
我正在嘗試創(chuàng)建一個類,它可以運(yùn)行一個單獨(dú)的進(jìn)程來完成一些需要很長時間的工作,從一個主模塊啟動一堆這些,然后等待它們?nèi)客瓿?我想啟動一次流程,然后繼續(xù)為它們提供要做的事情,而不是創(chuàng)建和破壞流程.例如,也許我有 10 臺服務(wù)器運(yùn)行 dd 命令,然后我希望它們都 scp 文件等.
我的最終目標(biāo)是為每個系統(tǒng)創(chuàng)建一個類,以跟蹤與其關(guān)聯(lián)的系統(tǒng)的信息,例如 IP 地址、日志、運(yùn)行時等.但是該類必須能夠啟動系統(tǒng)命令然后在系統(tǒng)命令運(yùn)行時將執(zhí)行返回給調(diào)用者,以便稍后跟進(jìn)系統(tǒng)命令的結(jié)果.
我的嘗試失敗了,因?yàn)槲覠o法通過管道將類的實(shí)例方法通過 pickle 發(fā)送到子進(jìn)程.那些是不可腌制的.因此,我嘗試以各種方式修復(fù)它,但我無法弄清楚.如何修補(bǔ)我的代碼來做到這一點(diǎn)?如果你不能發(fā)送任何有用的東西,多處理有什么好處?
是否有關(guān)于多處理與類實(shí)例一起使用的良好文檔?我可以讓多處理模塊工作的唯一方法是使用簡單的功能.每次在類實(shí)例中使用它的嘗試都失敗了.也許我應(yīng)該改為傳遞事件?我還不明白該怎么做.
導(dǎo)入多處理導(dǎo)入系統(tǒng)重新進(jìn)口類 ProcessWorker(multiprocessing.Process):"""此類作為單獨(dú)的進(jìn)程運(yùn)行,以并行執(zhí)行工作人員的命令一旦啟動,它會繼續(xù)運(yùn)行,監(jiān)控任務(wù)隊(duì)列,直到發(fā)送無""""def __init__(self, task_q, result_q):multiprocessing.Process.__init__(self)self.task_q = task_qself.result_q = 結(jié)果_q返回定義運(yùn)行(自我):"""multiprocessing.Process 提供的重載函數(shù).調(diào)用 start() 信號"""proc_name = self.nameprint '%s: Launched' % (proc_name)而真:next_task_list = self.task_q.get()如果 next_task 為無:# 毒丸表示關(guān)機(jī)print '%s: Exiting' % (proc_name)self.task_q.task_done()休息下一個任務(wù) = 下一個任務(wù)列表[0]打印 '%s: %s' % (proc_name, next_task)args = next_task_list[1]kwargs = next_task_list[2]答案 = next_task(*args, **kwargs)self.task_q.task_done()self.result_q.put(答案)返回# ProcessWorker 類結(jié)束類工人(對象):"""啟動子進(jìn)程以在單獨(dú)的進(jìn)程中運(yùn)行派生類的命令,坐下來聽某事做這個基類被每個派生工作者調(diào)用"""def __init__(self, config, index=None):self.config = 配置self.index = 索引# 為任何有索引值的東西啟動 ProcessWorker如果 self.index 不是無:self.task_q = multiprocessing.JoinableQueue()self.result_q = multiprocessing.Queue()self.process_worker = ProcessWorker(self.task_q, self.result_q)self.process_worker.start()打印到這里"# 進(jìn)程應(yīng)該正在運(yùn)行并監(jiān)聽要執(zhí)行的函數(shù)返回def enqueue_process(target): # 沒有 self,因?yàn)樗且粋€裝飾器"""用于將此類對象中的命令目標(biāo)放入 task_q注意:任何用 this 修飾的函數(shù)都必須使用 fetch_results() 來獲取目標(biāo)任務(wù)的結(jié)果值"""def 包裝器(自我,*args,**kwargs):self.task_q.put([target, args, kwargs]) # FAIL: target 是類實(shí)例方法,不能腌制!返回包裝def fetch_results(self):"""在所有進(jìn)程都由多個模塊生成之后,此命令被每個人調(diào)用以檢索調(diào)用的結(jié)果.這會阻塞,直到隊(duì)列中的項(xiàng)目執(zhí)行完成"""self.task_q.join() # 等待它完成return self.result_q.get() # 返回結(jié)果@enqueue_processdef run_long_command(自我,命令):print "我正在運(yùn)行 number % as process "%number, self.name# 在這里,我將啟動一個子進(jìn)程來運(yùn)行一個長時間運(yùn)行的系統(tǒng)命令# p = Popen(命令)等# p.wait() 等返回def 關(guān)閉(自我):self.task_q.put(無)self.task_q.join()如果 __name__ == '__main__':config = [一些價值",其他東西"]指數(shù) = 7工人= []對于范圍內(nèi)的 i (5):工人=工人(配置,索引)worker.run_long_command("ls/")工人.追加(工人)對于工人中的工人:worker.fetch_results()# 做更多的工作...(這實(shí)際上會在另一個類的分發(fā)器中完成)對于工人中的工人:工人.close()
我嘗試將 ProcessWorker
類和多處理隊(duì)列的創(chuàng)建移到 Worker
類之外,然后嘗試手動腌制工作者實(shí)例.即使這樣也不起作用,我得到一個錯誤
RuntimeError: 隊(duì)列對象應(yīng)該只在進(jìn)程之間共享通過繼承
.但我只是將這些隊(duì)列的引用傳遞給工作實(shí)例?我缺少一些基本的東西.這是主要部分的修改代碼:
如果 __name__ == '__main__':config = [一些價值",其他東西"]指數(shù) = 7工人= []對于范圍內(nèi)的 i (1):task_q = multiprocessing.JoinableQueue()result_q = multiprocessing.Queue()process_worker = ProcessWorker(task_q, result_q)worker = Worker(config, index, process_worker, task_q, result_q)something_to_look_at = pickle.dumps(worker) # FAIL: 不喜歡排隊(duì)??process_worker.start()worker.run_long_command("ls/")
與其嘗試發(fā)送方法本身(這是不切實(shí)際的),不如嘗試發(fā)送要執(zhí)行的方法的名稱.p>
假設(shè)每個worker運(yùn)行相同的代碼,這只是一個簡單的getattr(self, task_name)
.
我會傳遞元組 (task_name, task_args)
,其中 task_args
是直接提供給任務(wù)方法的 dict:
next_task_name, next_task_args = self.task_q.get()如果下一個任務(wù)名稱:任務(wù) = getattr(self, next_task_name)答案 = 任務(wù)(**next_task_args)...別的:#毒丸,關(guān)機(jī)休息
I am trying to create a class than can run a separate process to go do some work that takes a long time, launch a bunch of these from a main module and then wait for them all to finish. I want to launch the processes once and then keep feeding them things to do rather than creating and destroying processes. For example, maybe I have 10 servers running the dd command, then I want them all to scp a file, etc.
My ultimate goal is to create a class for each system that keeps track of the information for the system in which it is tied to like IP address, logs, runtime, etc. But that class must be able to launch a system command and then return execution back to the caller while that system command runs, to followup with the result of the system command later.
My attempt is failing because I cannot send an instance method of a class over the pipe to the subprocess via pickle. Those are not pickleable. I therefore tried to fix it various ways but I can't figure it out. How can my code be patched to do this? What good is multiprocessing if you can't send over anything useful?
Is there any good documentation of multiprocessing being used with class instances? The only way I can get the multiprocessing module to work is on simple functions. Every attempt to use it within a class instance has failed. Maybe I should pass events instead? I don't understand how to do that yet.
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
Edit: I tried to move the ProcessWorker
class and the creation of the multiprocessing queues outside of the Worker
class and then tried to manually pickle the worker instance. Even that doesn't work and I get an error
RuntimeError: Queue objects should only be shared between processes through inheritance
. But I am only passing references of those queues into the worker instance?? I am missing something fundamental. Here is the modified code from the main section:
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
Instead of attempting to send a method itself (which is impractical), try sending a name of a method to execute.
Provided that each worker runs the same code, it's a matter of a simple getattr(self, task_name)
.
I'd pass tuples (task_name, task_args)
, where task_args
were a dict to be directly fed to the task method:
next_task_name, next_task_args = self.task_q.get()
if next_task_name:
task = getattr(self, next_task_name)
answer = task(**next_task_args)
...
else:
# poison pill, shut down
break
這篇關(guān)于如何在 Python 中對類實(shí)例使用多處理?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!