問(wèn)題描述
我正在編寫(xiě)一個(gè)有一個(gè)生產(chǎn)者和多個(gè)消費(fèi)者的服務(wù)器程序,讓我感到困惑的是只有第一個(gè)放入隊(duì)列的任務(wù)生產(chǎn)者得到消耗,之后排隊(duì)的任務(wù)不再被消耗,它們?nèi)匀淮嬖谟肋h(yuǎn)在隊(duì)列中.
I am writing a server program with one producer and multiple consumers, what confuses me is only the first task producer put into the queue gets consumed, after which tasks enqueued no longer get consumed, they remain in the queue forever.
from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time
def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
httpserv(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
queue.close()
Manager().start()
生產(chǎn)者是一個(gè) HTTP 服務(wù)器,一旦接收到任務(wù),它就會(huì)將任務(wù)放入隊(duì)列中來(lái)自用戶(hù)的請(qǐng)求.看來(lái)消費(fèi)者流程還在當(dāng)隊(duì)列中有新任務(wù)時(shí)阻塞,這很奇怪.
The producer is a HTTP server which put a task in the queue once receive a request from the user. It seems that consumer processes are still blocked when there are new tasks in the queue, which is weird.
附:另外兩個(gè)與上述無(wú)關(guān)的問(wèn)題,我不確定是否最好將 HTTP 服務(wù)器放在自己的進(jìn)程中而不是主進(jìn)程中進(jìn)程,如果是,我怎樣才能讓主進(jìn)程繼續(xù)運(yùn)行子進(jìn)程結(jié)束.第二個(gè)問(wèn)題,什么是最好的方法來(lái)阻止HTTP 服務(wù)器優(yōu)雅嗎?
P.S. Another two questions not relating to the above, I am not sure if it's better to put HTTP server in its own process other than the main process, if yes how can I make the main process keep running before all children processes end. Second question, what's the best way to stop the HTTP server gracefully?
編輯:添加生產(chǎn)者代碼,它只是一個(gè)簡(jiǎn)單的python wsgi服務(wù)器:
Edit: add producer code, it's just a simple python wsgi server:
import fapws._evwsgi as evwsgi
from fapws import base
def httpserv(queue):
evwsgi.start("0.0.0.0", 8080)
evwsgi.set_base_module(base)
def request_1(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_1')
return ["request 1!"]
def request_2(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_2')
return ["request 2!!"]
evwsgi.wsgi_cb(("/request_1", request_1))
evwsgi.wsgi_cb(("/request_2", request_2))
evwsgi.run()
推薦答案
我認(rèn)為 Web 服務(wù)器部分一定有問(wèn)題,因?yàn)樗\(yùn)行良好:
I think there must be something wrong with the web server part, as this works perfectly:
from multiprocessing import Process, Queue, cpu_count
import random
import time
def serve(queue):
works = ["task_1", "task_2"]
while True:
time.sleep(0.01)
queue.put(random.choice(works))
def work(id, queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(0.05)
print "%d task:" % id, task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
print "starting %d workers" % self.NUMBER_OF_PROCESSES
self.workers = [Process(target=work, args=(i, self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
serve(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
self.queue.close()
Manager().start()
樣本輸出:
starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
這篇關(guān)于python多處理的生產(chǎn)者/消費(fèi)者問(wèn)題的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!