問題描述
我正在使用 I/O 非阻塞 python 服務器 Tornado.我有一類 GET
請求可能需要很長時間才能完成(想想在 5-10 秒的范圍內).問題是 Tornado 會阻止這些請求,因此后續的快速請求會一直保持到慢速請求完成.
I am using the I/O non-blocking python server Tornado. I have a class of GET
requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.
我看了:https://github.com/facebook/tornado/wiki/Threading-and-concurrency 并得出結論,我想要#3(其他進程)和#4(其他線程)的某種組合.#4 本身就有問題,當有另一個線程在執行heavy_lifting"時,我無法將可靠的控制權返回給 ioloop.(我認為這是由于 GIL 以及 heavy_lifting 任務具有高 CPU 負載并不斷將控制權從主 ioloop 中拉出的事實,但這是一個猜測).
I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).
所以我一直在設計如何通過在單獨的進程中在這些緩慢的 GET
請求中執行繁重"任務來解決這個問題,然后在進程完成后將回調放回 Tornado ioloop完成請求.這釋放了 ioloop 來處理其他請求.
So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET
requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.
我創建了一個簡單的示例來展示一個可能的解決方案,但我很想從社區中獲得反饋.
I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.
我的問題有兩個:如何簡化當前的方法?它可能存在哪些陷阱?
利用 Tornado 的內置
asynchronous
裝飾器,它允許請求保持打開狀態并讓 ioloop 繼續.
Utilize Tornado's builtin
asynchronous
decorator which allows a request to stay open and for the ioloop to continue.
使用 python 的 multiprocessing
模塊為繁重"任務生成一個單獨的進程.我首先嘗試使用 threading
模塊,但無法將任何可靠的控制權交還給 ioloop.mutliprocessing
似乎也可以利用多核.
Spawn a separate process for "heavy lifting" tasks using python's multiprocessing
module. I first attempted to use the threading
module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears that mutliprocessing
would also take advantage of multicores.
使用 threading
模塊在主 ioloop 進程中啟動一個觀察者"線程,其工作是觀察 multiprocessing.Queue
的結果完成后的繁重"任務.這是必要的,因為我需要一種方法來知道 heavy_lifting 任務已經完成,同時仍然能夠通知 ioloop 該請求現已完成.
Start a 'watcher' thread in the main ioloop process using the threading
module who's job it is to watch a multiprocessing.Queue
for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.
確保觀察者"線程經常通過 time.sleep(0)
調用將控制權交給主 ioloop 循環,以便繼續輕松處理其他請求.
Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with time.sleep(0)
calls so that other requests continue to get readily processed.
當隊列中有結果時,然后使用 tornado.ioloop.IOLoop.instance().add_callback()
從觀察者"線程添加一個回調,記錄為從其他線程調用 ioloop 實例的唯一安全方法.
When there is a result in the queue then add a callback from the "watcher" thread using tornado.ioloop.IOLoop.instance().add_callback()
which is documented to be the only safe way to call ioloop instances from other threads.
請務必在回調中調用 finish()
以完成請求并提交回復.
Be sure to then call finish()
in the callback to complete the request and hand over a reply.
下面是一些展示這種方法的示例代碼.multi_tornado.py
是實現上述大綱的服務器,call_multi.py
是一個示例腳本,它以兩種不同的方式調用服務器來測試服務器.兩個測試通過 3 個慢速 GET
請求和 20 個快速 GET
請求調用服務器.結果顯示在打開和未打開線程的情況下運行.
Below is some sample code showing this approach. multi_tornado.py
is the server implementing the above outline and call_multi.py
is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET
requests followed by 20 fast GET
requests. The results are shown for both running with and without the threading turned on.
在無線程"運行它的情況下,3 個緩慢的請求塊(每個需要一秒鐘多一點的時間才能完成).20 個快速請求中有幾個擠在 ioloop 中的一些慢速請求之間(不完全確定這是如何發生的 - 但可能是我在同一臺機器上同時運行服務器和客戶端測試腳本的工件).這里的重點是所有快速請求都在不同程度上受到了阻礙.
In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.
在啟用線程的情況下,20 個快速請求首先立即完成,三個慢速請求在之后大約同時完成,因為它們每個都并行運行.這是期望的行為.三個慢速請求并行完成需要 2.5 秒 - 而在非線程情況下,三個慢速請求總共需要大約 3.5 秒.所以總體上大約有 35% 的加速(我假設是由于多核共享).但更重要的是 - 快速請求會立即以慢速請求的 leu 處理.
In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.
我在多線程編程方面沒有太多經驗 - 所以雖然這在這里看起來可行,但我很想學習:
I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:
有沒有更簡單的方法來做到這一點?這種方法中可能潛伏著什么怪物?
(注意:未來的權衡可能是只運行更多 Tornado 實例,使用反向代理(如 nginx)進行負載平衡.無論如何,我將使用負載平衡器運行多個實例 - 但我擔心只是拋出硬件在這個問題上,因為似乎硬件在阻塞方面與問題直接相關.)
(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)
multi_tornado.py
(示例服務器):
multi_tornado.py
(sample server):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(客戶端測試人員):
call_multi.py
(client tester):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '
Start sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
測試結果
通過運行 python call_multi.py slow
(阻塞行為):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
通過運行 python call_multi.py slow_threaded
(期望的行為):
By running python call_multi.py slow_threaded
(the desired behavior):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
推薦答案
如果你愿意使用 concurrent.futures.ProcessPoolExecutor
而不是 multiprocessing
,這個其實很簡單.Tornado 的 ioloop 已經支持 concurrent.futures.Future
,因此它們開箱即用可以很好地配合使用.concurrent.futures
包含在 Python 3.2+ 中,并且 已向后移植到 Python 2.x.
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
這是一個例子:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
輸出:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
的 API 比 multiprocessing.Pool
更有限,但如果您不需要 multiprocessing.Pool
的更高級功能,值得使用,因為集成要簡單得多.
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
這篇關于如何使用 python Tornado 服務器在請求中最好地執行多處理?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!