問題描述
我正在編寫一個有一個生產者和多個消費者的服務器程序,讓我感到困惑的是只有第一個放入隊列的任務生產者得到消耗,之后排隊的任務不再被消耗,它們仍然存在永遠在隊列中.
I am writing a server program with one producer and multiple consumers, what confuses me is only the first task producer put into the queue gets consumed, after which tasks enqueued no longer get consumed, they remain in the queue forever.
from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time
def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
httpserv(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
queue.close()
Manager().start()
生產者是一個 HTTP 服務器,一旦接收到任務,它就會將任務放入隊列中來自用戶的請求.看來消費者流程還在當隊列中有新任務時阻塞,這很奇怪.
The producer is a HTTP server which put a task in the queue once receive a request from the user. It seems that consumer processes are still blocked when there are new tasks in the queue, which is weird.
附:另外兩個與上述無關的問題,我不確定是否最好將 HTTP 服務器放在自己的進程中而不是主進程中進程,如果是,我怎樣才能讓主進程繼續運行子進程結束.第二個問題,什么是最好的方法來阻止HTTP 服務器優雅嗎?
P.S. Another two questions not relating to the above, I am not sure if it's better to put HTTP server in its own process other than the main process, if yes how can I make the main process keep running before all children processes end. Second question, what's the best way to stop the HTTP server gracefully?
編輯:添加生產者代碼,它只是一個簡單的python wsgi服務器:
Edit: add producer code, it's just a simple python wsgi server:
import fapws._evwsgi as evwsgi
from fapws import base
def httpserv(queue):
evwsgi.start("0.0.0.0", 8080)
evwsgi.set_base_module(base)
def request_1(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_1')
return ["request 1!"]
def request_2(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_2')
return ["request 2!!"]
evwsgi.wsgi_cb(("/request_1", request_1))
evwsgi.wsgi_cb(("/request_2", request_2))
evwsgi.run()
推薦答案
我認為 Web 服務器部分一定有問題,因為它運行良好:
I think there must be something wrong with the web server part, as this works perfectly:
from multiprocessing import Process, Queue, cpu_count
import random
import time
def serve(queue):
works = ["task_1", "task_2"]
while True:
time.sleep(0.01)
queue.put(random.choice(works))
def work(id, queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(0.05)
print "%d task:" % id, task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
print "starting %d workers" % self.NUMBER_OF_PROCESSES
self.workers = [Process(target=work, args=(i, self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
serve(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
self.queue.close()
Manager().start()
樣本輸出:
starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
這篇關于python多處理的生產者/消費者問題的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!