問題描述
當(dāng)他們第一次看到 Python 中的線程時(shí),幾乎每個(gè)人都知道,對(duì)于那些真正想要并行處理的人來說,GIL 讓他們的生活變得悲慘——或者至少給它一個(gè)機(jī)會(huì).
As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.
我目前正在考慮實(shí)現(xiàn)類似反應(yīng)堆模式的東西.實(shí)際上,我想在一個(gè)類線程上監(jiān)聽傳入的套接字連接,當(dāng)有人嘗試連接時(shí),接受該連接并將其傳遞給另一個(gè)類線程進(jìn)行處理.
I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.
我(還)不確定我可能面臨什么樣的負(fù)載.我知道目前對(duì)傳入消息設(shè)置了 2MB 的上限.從理論上講,我們每秒可以得到數(shù)千(盡管我不知道實(shí)際上我們是否見過類似的東西).處理消息所花費(fèi)的時(shí)間并不非常重要,但顯然越快越好.
I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.
我正在研究 Reactor 模式,并使用 multiprocessing
庫開發(fā)了一個(gè)小示例,該庫(至少在測試中)似乎工作得很好.但是,現(xiàn)在/很快我們將擁有 asyncio 庫,它將為我處理事件循環(huán).
I was looking into the Reactor pattern, and developed a small example using the multiprocessing
library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.
將 asyncio
和 multiprocessing
結(jié)合起來有什么可以咬我的嗎?
Is there anything that could bite me by combining asyncio
and multiprocessing
?
推薦答案
你應(yīng)該能夠安全地結(jié)合 asyncio
和 multiprocessing
沒有太多麻煩,雖然你不應(yīng)該t 直接使用 multiprocessing
.asyncio
(以及任何其他基于事件循環(huán)的異步框架)的主要罪過是阻塞事件循環(huán).如果您嘗試直接使用 multiprocessing
,則任何時(shí)候您阻塞等待子進(jìn)程,您都會(huì)阻塞事件循環(huán).顯然,這很糟糕.
You should be able to safely combine asyncio
and multiprocessing
without too much trouble, though you shouldn't be using multiprocessing
directly. The cardinal sin of asyncio
(and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing
directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.
避免這種情況的最簡單方法是使用 BaseEventLoop.run_in_executor
在 concurrent.futures.ProcessPoolExecutor
.ProcessPoolExecutor
是使用 multiprocessing.Process
實(shí)現(xiàn)的進(jìn)程池,但 asyncio
內(nèi)置支持在其中執(zhí)行函數(shù)而不阻塞事件循環(huán).這是一個(gè)簡單的例子:
The simplest way to avoid this is to use BaseEventLoop.run_in_executor
to execute a function in a concurrent.futures.ProcessPoolExecutor
. ProcessPoolExecutor
is a process pool implemented using multiprocessing.Process
, but asyncio
has built-in support for executing a function in it without blocking the event loop. Here's a simple example:
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5
@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在大多數(shù)情況下,僅此功能就足夠了.如果您發(fā)現(xiàn)自己需要來自 multiprocessing
的其他構(gòu)造,例如 Queue
、Event
、Manager
等,則有一個(gè)名為 aioprocessing
的第三方庫(完全公開:我寫的),它提供了所有 multiprocessing
數(shù)據(jù)結(jié)構(gòu)的 asyncio
兼容版本.這是一個(gè)演示示例:
For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing
, like Queue
, Event
, Manager
, etc., there is a third-party library called aioprocessing
(full disclosure: I wrote it), that provides asyncio
-compatible versions of all the multiprocessing
data structures. Here's an example demoing that:
import time
import asyncio
import aioprocessing
import multiprocessing
def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()
@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
這篇關(guān)于將 asyncio 與多處理結(jié)合起來會(huì)出現(xiàn)什么樣的問題(如果有的話)?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!