久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

  1. <small id='IV2mx'></small><noframes id='IV2mx'>

  2. <legend id='IV2mx'><style id='IV2mx'><dir id='IV2mx'><q id='IV2mx'></q></dir></style></legend>

      • <bdo id='IV2mx'></bdo><ul id='IV2mx'></ul>
      <i id='IV2mx'><tr id='IV2mx'><dt id='IV2mx'><q id='IV2mx'><span id='IV2mx'><b id='IV2mx'><form id='IV2mx'><ins id='IV2mx'></ins><ul id='IV2mx'></ul><sub id='IV2mx'></sub></form><legend id='IV2mx'></legend><bdo id='IV2mx'><pre id='IV2mx'><center id='IV2mx'></center></pre></bdo></b><th id='IV2mx'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='IV2mx'><tfoot id='IV2mx'></tfoot><dl id='IV2mx'><fieldset id='IV2mx'></fieldset></dl></div>

    1. <tfoot id='IV2mx'></tfoot>

      如何在 Python 中對類實(shí)例使用多處理?

      How to use multiprocessing with class instances in Python?(如何在 Python 中對類實(shí)例使用多處理?)
        <bdo id='EVyhe'></bdo><ul id='EVyhe'></ul>
        <tfoot id='EVyhe'></tfoot>
            <tbody id='EVyhe'></tbody>
          • <legend id='EVyhe'><style id='EVyhe'><dir id='EVyhe'><q id='EVyhe'></q></dir></style></legend>
            <i id='EVyhe'><tr id='EVyhe'><dt id='EVyhe'><q id='EVyhe'><span id='EVyhe'><b id='EVyhe'><form id='EVyhe'><ins id='EVyhe'></ins><ul id='EVyhe'></ul><sub id='EVyhe'></sub></form><legend id='EVyhe'></legend><bdo id='EVyhe'><pre id='EVyhe'><center id='EVyhe'></center></pre></bdo></b><th id='EVyhe'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='EVyhe'><tfoot id='EVyhe'></tfoot><dl id='EVyhe'><fieldset id='EVyhe'></fieldset></dl></div>

            • <small id='EVyhe'></small><noframes id='EVyhe'>

                本文介紹了如何在 Python 中對類實(shí)例使用多處理?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

                問題描述

                限時送ChatGPT賬號..

                我正在嘗試創(chuàng)建一個類,它可以運(yùn)行一個單獨(dú)的進(jìn)程來完成一些需要很長時間的工作,從一個主模塊啟動一堆這些,然后等待它們?nèi)客瓿?我想啟動一次流程,然后繼續(xù)為它們提供要做的事情,而不是創(chuàng)建和破壞流程.例如,也許我有 10 臺服務(wù)器運(yùn)行 dd 命令,然后我希望它們都 scp 文件等.

                我的最終目標(biāo)是為每個系統(tǒng)創(chuàng)建一個類,以跟蹤與其關(guān)聯(lián)的系統(tǒng)的信息,例如 IP 地址、日志、運(yùn)行時等.但是該類必須能夠啟動系統(tǒng)命令然后在系統(tǒng)命令運(yùn)行時將執(zhí)行返回給調(diào)用者,以便稍后跟進(jìn)系統(tǒng)命令的結(jié)果.

                我的嘗試失敗了,因?yàn)槲覠o法通過管道將類的實(shí)例方法通過 pickle 發(fā)送到子進(jìn)程.那些是不可腌制的.因此,我嘗試以各種方式修復(fù)它,但我無法弄清楚.如何修補(bǔ)我的代碼來做到這一點(diǎn)?如果你不能發(fā)送任何有用的東西,多處理有什么好處?

                是否有關(guān)于多處理與類實(shí)例一起使用的良好文檔?我可以讓多處理模塊工作的唯一方法是使用簡單的功能.每次在類實(shí)例中使用它的嘗試都失敗了.也許我應(yīng)該改為傳遞事件?我還不明白該怎么做.

                導(dǎo)入多處理導(dǎo)入系統(tǒng)重新進(jìn)口類 ProcessWorker(multiprocessing.Process):"""此類作為單獨(dú)的進(jìn)程運(yùn)行,以并行執(zhí)行工作人員的命令一旦啟動,它會繼續(xù)運(yùn)行,監(jiān)控任務(wù)隊(duì)列,直到發(fā)送無""""def __init__(self, task_q, result_q):multiprocessing.Process.__init__(self)self.task_q = task_qself.result_q = 結(jié)果_q返回定義運(yùn)行(自我):"""multiprocessing.Process 提供的重載函數(shù).調(diào)用 start() 信號"""proc_name = self.nameprint '%s: Launched' % (proc_name)而真:next_task_list = self.task_q.get()如果 next_task 為無:# 毒丸表示關(guān)機(jī)print '%s: Exiting' % (proc_name)self.task_q.task_done()休息下一個任務(wù) = 下一個任務(wù)列表[0]打印 '%s: %s' % (proc_name, next_task)args = next_task_list[1]kwargs = next_task_list[2]答案 = next_task(*args, **kwargs)self.task_q.task_done()self.result_q.put(答案)返回# ProcessWorker 類結(jié)束類工人(對象):"""啟動子進(jìn)程以在單獨(dú)的進(jìn)程中運(yùn)行派生類的命令,坐下來聽某事做這個基類被每個派生工作者調(diào)用"""def __init__(self, config, index=None):self.config = 配置self.index = 索引# 為任何有索引值的東西啟動 ProcessWorker如果 self.index 不是無:self.task_q = multiprocessing.JoinableQueue()self.result_q = multiprocessing.Queue()self.process_worker = ProcessWorker(self.task_q, self.result_q)self.process_worker.start()打印到這里"# 進(jìn)程應(yīng)該正在運(yùn)行并監(jiān)聽要執(zhí)行的函數(shù)返回def enqueue_process(target): # 沒有 self,因?yàn)樗且粋€裝飾器"""用于將此類對象中的命令目標(biāo)放入 task_q注意:任何用 this 修飾的函數(shù)都必須使用 fetch_results() 來獲取目標(biāo)任務(wù)的結(jié)果值"""def 包裝器(自我,*args,**kwargs):self.task_q.put([target, args, kwargs]) # FAIL: target 是類實(shí)例方法,不能腌制!返回包裝def fetch_results(self):"""在所有進(jìn)程都由多個模塊生成之后,此命令被每個人調(diào)用以檢索調(diào)用的結(jié)果.這會阻塞,直到隊(duì)列中的項(xiàng)目執(zhí)行完成"""self.task_q.join() # 等待它完成return self.result_q.get() # 返回結(jié)果@enqueue_processdef run_long_command(自我,命令):print "我正在運(yùn)行 number % as process "%number, self.name# 在這里,我將啟動一個子進(jìn)程來運(yùn)行一個長時間運(yùn)行的系統(tǒng)命令# p = Popen(命令)等# p.wait() 等返回def 關(guān)閉(自我):self.task_q.put(無)self.task_q.join()如果 __name__ == '__main__':config = [一些價值",其他東西"]指數(shù) = 7工人= []對于范圍內(nèi)的 i (5):工人=工人(配置,索引)worker.run_long_command("ls/")工人.追加(工人)對于工人中的工人:worker.fetch_results()# 做更多的工作...(這實(shí)際上會在另一個類的分發(fā)器中完成)對于工人中的工人:工人.close()

                我嘗試將 ProcessWorker 類和多處理隊(duì)列的創(chuàng)建移到 Worker 類之外,然后嘗試手動腌制工作者實(shí)例.即使這樣也不起作用,我得到一個錯誤

                <塊引用>

                RuntimeError: 隊(duì)列對象應(yīng)該只在進(jìn)程之間共享通過繼承

                .但我只是將這些隊(duì)列的引用傳遞給工作實(shí)例?我缺少一些基本的東西.這是主要部分的修改代碼:

                如果 __name__ == '__main__':config = [一些價值",其他東西"]指數(shù) = 7工人= []對于范圍內(nèi)的 i (1):task_q = multiprocessing.JoinableQueue()result_q = multiprocessing.Queue()process_worker = ProcessWorker(task_q, result_q)worker = Worker(config, index, process_worker, task_q, result_q)something_to_look_at = pickle.dumps(worker) # FAIL: 不喜歡排隊(duì)??process_worker.start()worker.run_long_command("ls/")

                解決方案

                與其嘗試發(fā)送方法本身(這是不切實(shí)際的),不如嘗試發(fā)送要執(zhí)行的方法的名稱.p>

                假設(shè)每個worker運(yùn)行相同的代碼,這只是一個簡單的getattr(self, task_name).

                我會傳遞元組 (task_name, task_args),其中 task_args 是直接提供給任務(wù)方法的 dict:

                next_task_name, next_task_args = self.task_q.get()如果下一個任務(wù)名稱:任務(wù) = getattr(self, next_task_name)答案 = 任務(wù)(**next_task_args)...別的:#毒丸,關(guān)機(jī)休息

                I am trying to create a class than can run a separate process to go do some work that takes a long time, launch a bunch of these from a main module and then wait for them all to finish. I want to launch the processes once and then keep feeding them things to do rather than creating and destroying processes. For example, maybe I have 10 servers running the dd command, then I want them all to scp a file, etc.

                My ultimate goal is to create a class for each system that keeps track of the information for the system in which it is tied to like IP address, logs, runtime, etc. But that class must be able to launch a system command and then return execution back to the caller while that system command runs, to followup with the result of the system command later.

                My attempt is failing because I cannot send an instance method of a class over the pipe to the subprocess via pickle. Those are not pickleable. I therefore tried to fix it various ways but I can't figure it out. How can my code be patched to do this? What good is multiprocessing if you can't send over anything useful?

                Is there any good documentation of multiprocessing being used with class instances? The only way I can get the multiprocessing module to work is on simple functions. Every attempt to use it within a class instance has failed. Maybe I should pass events instead? I don't understand how to do that yet.

                import multiprocessing
                import sys
                import re
                
                class ProcessWorker(multiprocessing.Process):
                    """
                    This class runs as a separate process to execute worker's commands in parallel
                    Once launched, it remains running, monitoring the task queue, until "None" is sent
                    """
                
                    def __init__(self, task_q, result_q):
                        multiprocessing.Process.__init__(self)
                        self.task_q = task_q
                        self.result_q = result_q
                        return
                
                    def run(self):
                        """
                        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
                        """
                        proc_name = self.name
                        print '%s: Launched' % (proc_name)
                        while True:
                            next_task_list = self.task_q.get()
                            if next_task is None:
                                # Poison pill means shutdown
                                print '%s: Exiting' % (proc_name)
                                self.task_q.task_done()
                                break
                            next_task = next_task_list[0]
                            print '%s: %s' % (proc_name, next_task)
                            args = next_task_list[1]
                            kwargs = next_task_list[2]
                            answer = next_task(*args, **kwargs)
                            self.task_q.task_done()
                            self.result_q.put(answer)
                        return
                # End of ProcessWorker class
                
                class Worker(object):
                    """
                    Launches a child process to run commands from derived classes in separate processes,
                    which sit and listen for something to do
                    This base class is called by each derived worker
                    """
                    def __init__(self, config, index=None):
                        self.config = config
                        self.index = index
                
                        # Launce the ProcessWorker for anything that has an index value
                        if self.index is not None:
                            self.task_q = multiprocessing.JoinableQueue()
                            self.result_q = multiprocessing.Queue()
                
                            self.process_worker = ProcessWorker(self.task_q, self.result_q)
                            self.process_worker.start()
                            print "Got here"
                            # Process should be running and listening for functions to execute
                        return
                
                    def enqueue_process(target):  # No self, since it is a decorator
                        """
                        Used to place an command target from this class object into the task_q
                        NOTE: Any function decorated with this must use fetch_results() to get the
                        target task's result value
                        """
                        def wrapper(self, *args, **kwargs):
                            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
                        return wrapper
                
                    def fetch_results(self):
                        """
                        After all processes have been spawned by multiple modules, this command
                        is called on each one to retreive the results of the call.
                        This blocks until the execution of the item in the queue is complete
                        """
                        self.task_q.join()                          # Wait for it to to finish
                        return self.result_q.get()                  # Return the result
                
                    @enqueue_process
                    def run_long_command(self, command):
                        print "I am running number % as process "%number, self.name
                
                        # In here, I will launch a subprocess to run a  long-running system command
                        # p = Popen(command), etc
                        # p.wait(), etc
                        return 
                
                    def close(self):
                        self.task_q.put(None)
                        self.task_q.join()
                
                if __name__ == '__main__':
                    config = ["some value", "something else"]
                    index = 7
                    workers = []
                    for i in range(5):
                        worker = Worker(config, index)
                        worker.run_long_command("ls /")
                        workers.append(worker)
                    for worker in workers:
                        worker.fetch_results()
                
                    # Do more work... (this would actually be done in a distributor in another class)
                
                    for worker in workers:
                        worker.close() 
                

                Edit: I tried to move the ProcessWorker class and the creation of the multiprocessing queues outside of the Worker class and then tried to manually pickle the worker instance. Even that doesn't work and I get an error

                RuntimeError: Queue objects should only be shared between processes through inheritance

                . But I am only passing references of those queues into the worker instance?? I am missing something fundamental. Here is the modified code from the main section:

                if __name__ == '__main__':
                    config = ["some value", "something else"]
                    index = 7
                    workers = []
                    for i in range(1):
                        task_q = multiprocessing.JoinableQueue()
                        result_q = multiprocessing.Queue()
                        process_worker = ProcessWorker(task_q, result_q)
                        worker = Worker(config, index, process_worker, task_q, result_q)
                        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
                        process_worker.start()
                        worker.run_long_command("ls /")
                

                解決方案

                Instead of attempting to send a method itself (which is impractical), try sending a name of a method to execute.

                Provided that each worker runs the same code, it's a matter of a simple getattr(self, task_name).

                I'd pass tuples (task_name, task_args), where task_args were a dict to be directly fed to the task method:

                next_task_name, next_task_args = self.task_q.get()
                if next_task_name:
                  task = getattr(self, next_task_name)
                  answer = task(**next_task_args)
                  ...
                else:
                  # poison pill, shut down
                  break
                

                這篇關(guān)于如何在 Python 中對類實(shí)例使用多處理?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

                【網(wǎng)站聲明】本站部分內(nèi)容來源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請聯(lián)系我們刪除處理,感謝您的支持!

                相關(guān)文檔推薦

                What exactly is Python multiprocessing Module#39;s .join() Method Doing?(Python 多處理模塊的 .join() 方法到底在做什么?)
                Passing multiple parameters to pool.map() function in Python(在 Python 中將多個參數(shù)傳遞給 pool.map() 函數(shù))
                multiprocessing.pool.MaybeEncodingError: #39;TypeError(quot;cannot serialize #39;_io.BufferedReader#39; objectquot;,)#39;(multiprocessing.pool.MaybeEncodingError: TypeError(cannot serialize _io.BufferedReader object,)) - IT屋-程序員軟件開
                Python Multiprocess Pool. How to exit the script when one of the worker process determines no more work needs to be done?(Python 多進(jìn)程池.當(dāng)其中一個工作進(jìn)程確定不再需要完成工作時,如何退出腳本?) - IT屋-程序員
                How do you pass a Queue reference to a function managed by pool.map_async()?(如何將隊(duì)列引用傳遞給 pool.map_async() 管理的函數(shù)?)
                yet another confusion with multiprocessing error, #39;module#39; object has no attribute #39;f#39;(與多處理錯誤的另一個混淆,“模塊對象沒有屬性“f)

                  <legend id='ymGeK'><style id='ymGeK'><dir id='ymGeK'><q id='ymGeK'></q></dir></style></legend>

                    <bdo id='ymGeK'></bdo><ul id='ymGeK'></ul>
                    • <small id='ymGeK'></small><noframes id='ymGeK'>

                      <tfoot id='ymGeK'></tfoot>
                            <tbody id='ymGeK'></tbody>

                          <i id='ymGeK'><tr id='ymGeK'><dt id='ymGeK'><q id='ymGeK'><span id='ymGeK'><b id='ymGeK'><form id='ymGeK'><ins id='ymGeK'></ins><ul id='ymGeK'></ul><sub id='ymGeK'></sub></form><legend id='ymGeK'></legend><bdo id='ymGeK'><pre id='ymGeK'><center id='ymGeK'></center></pre></bdo></b><th id='ymGeK'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='ymGeK'><tfoot id='ymGeK'></tfoot><dl id='ymGeK'><fieldset id='ymGeK'></fieldset></dl></div>
                        • 主站蜘蛛池模板: 一级做a| 麻豆一区 | 免费在线看黄视频 | 国产午夜精品一区二区三区四区 | 国产精品久久久爽爽爽麻豆色哟哟 | 精品久久久久久 | 午夜电影网站 | 国产日韩精品一区二区 | 999久久| 国产美女精品 | 一区二区av| 精品久 | 人人草人人干 | 日韩毛片中文字幕 | 久久久精品久久 | 欧美日韩三级视频 | 狠狠亚洲 | 91大神在线看 | 欧美在线观看黄色 | 一区二区视频 | 久久r免费视频 | 国产精品久久久久久久久久 | 欧美在线观看一区二区 | 婷婷桃色网 | 一区二区三区国产 | 免费观看一级特黄欧美大片 | 国产在线精品一区二区 | 丁香六月激情 | 亚洲精品一区二区三区在线 | 国产一区欧美一区 | 成人免费黄色 | 天天草夜夜骑 | 久久综合狠狠综合久久综合88 | 操操操日日日 | 成人在线中文 | 中文字幕精品一区二区三区精品 | 午夜国产一级片 | 古装三级在线播放 | 亚洲电影第1页 | 波多野结衣二区 | 久久久性 |