歡迎您光臨本站 註冊首頁

python多進程下的生產者和消費者模型

←手機掃碼閱讀     ljg58026 @ 2020-05-08 , reply:0

一、生產者消費者模型介紹
1.1 為什麼需要使用生產者消費者模型
生產者是指生產數據的任務,消費者是指消費數據的任務。當生產者的生產能力遠大於消費者的消費能力,生產者就需要等消費者消費完才能繼續生產新的數據,同理,如果消費者的消費能力遠大於生產者的生產能力,消費者就需要等生產者生產完數據才能繼續消費,這種等待會造成效率的低下,為了解決這種問題就引入了生產者消費者模型。
1.2 如何實現生產者消費者模型
進程間引入隊列可以實現生產者消費者模型,通過使用隊列無需考慮鎖的概念,因為進程間的通信是通過隊列來實現的;
生產者生產的數據往隊列裡面寫,消費者消費數據直接從隊列裡面取,這樣就對實現了生產者和消費者之間的解耦。
生產者 -- > 隊列 <--消費者
二、Queue實現生產者消費者模型
2.1 消費者生產者模型代碼
from multiprocessing import Process, Queue import time 

# 消費者方法 

def consumer(q, name): while True: res = q.get() 

# if res is None: break print("%s 吃了 %s" % (name, res)) 

# 生產者方法 

def producer(q, name, food): for i in range(3): time.sleep(1) 

# 模擬生產西瓜的時間延遲

 res = "%s %s" % (food, i) print("%s 生產了 %s" % (name, res)) 

# 把生產的vegetable放入到隊列中 

q.put(res) 

if __name__ == "__main__": #創建隊列

 q = Queue() # 創建生產者 

p1 = Process(target=producer, args=(q, "kelly", "西瓜")) c1 = Process(target=consumer, args=(q, "peter",)) p1.start() c1.start() # p1.join() # q.put(None) print("主進程")
2.2 執行結果
2.2.1 直接執行上面的代碼的結果
直接執行會出現一個問題就是生產者生產完了,沒有向消費者發送一個停止的信號,所以消費者一直會一直阻塞在q.get(),導致程序無法退出。
為了解決上面的問題,讓消費者消費完了生產者的數據之後自動退出,就需要在生產者進程介紹的時候往隊列裡面put一個結束信號,消費者拿到這個信號,就退出消費進程。
主要是兩個地方修改 ,把下方代碼的註釋打開就可以實現消費者消費完接收到生產者的結束信號就退出消費者進程了。
def consumer(): if res is None: break if __name__ == "__main__": p1.join() q.put(None)
2.2.2 把註釋打開後的運行結果
把註釋打開後,消費者拿到了生產者發送的結束信號,可以正常退出程序了。
但如果有n個消費者,就需要發送n個結束信號,這種方式就不是那麼簡潔,像下面的代碼這樣:
from multiprocessing import Process, Queue import time 

# 消費者方法 

def consumer(q, name): while True: res = q.get() if res is None: break print("%s 吃了 %s" % (name, res)) 

# 生產者方法 

def producer(q, name, food): for i in range(3): time.sleep(1)

 # 模擬生產西瓜的時間延遲

 res = "%s %s" % (food, i) print("%s 生產了 %s" % (name, res)) 

# 把生產的vegetable放入到隊列中 

q.put(res) 

if __name__ == "__main__": # 創建隊列

 q = Queue() # 創建生產者 

p1 = Process(target=producer, args=(q, "kelly", "西瓜")) p2 = Process(target=producer, args=(q, "kelly2", "香蕉")) c1 = Process(target=consumer, args=(q, "peter",)) c2 = Process(target=consumer, args=(q, "peter2",)) c3 = Process(target=consumer, args=(q, "peter3",)) p1.start() p2.start() c1.start() c2.start() c3.start() p1.join() p2.join() q.put(None) q.put(None) q.put(None) print("主進程")
其實我們現在就是生產者生產完數據之後想往隊列裡面發送一個結束信號,python語言提供了另外一種隊列JoinableQueue([maxsize])來解決這種問題
三、JoinableQueue實現生產者消費者模型
3.1 JoinableQueue方法介紹
JoinableQueue([maxsize]) : A queue type which also supports join() and task_done() methods
q.task_done():消費者使用此方法發出信號,表示q.get()的返回項目已經被處理。 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理;阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止。
3.2 JoinableQueue實現生產者消費者模型源碼
from multiprocessing import Process,JoinableQueue import time 

# 消費者方法 def consumer(q, name): while True: res = q.get() if res is None: break print("%s 吃了 %s" % (name, res)) q.task_done() # 發送信號給q.join(),表示已經從隊列中取走一個值並處理完畢了 

# 生產者方法 def producer(q, name, food): for i in range(3): time.sleep(1) 

# 模擬生產西瓜的時間延遲 res = "%s %s" % (food, i) print("%s 生產了 %s" % (name, res))

 # 把生產的vegetable放入到隊列中 q.put(res) q.join() # 等消費者把自己放入隊列的所有元素取完之後才結束 if __name__ == "__main__": # q = Queue() q = JoinableQueue() # 創建生產者 p1 = Process(target=producer, args=(q, "kelly", "西瓜")) p2 = Process(target=producer, args=(q, "kelly2", "藍莓")) # 創建消費者 c1 = Process(target=consumer, args=(q, "peter",)) c2 = Process(target=consumer, args=(q, "peter2",)) c3 = Process(target=consumer, args=(q, "peter3",)) c1.daemon = True c2.daemon = True c3.daemon = True p_l = [p1, p2, c1, c2, c3] for p in p_l: p.start() p1.join() p2.join() 

# 1.主進程等待p1,p2進程結束才繼續執行 

# 2.由於q.join()的存在,生產者只有等隊列中的元素被消費完才會結束 

# 3.生產者結束了,就代表消費者已經消費完了,也可以結束了,所以可以把消費者設置為守護進程(隨著主進程的退出而退出) print("主進程")
3.3 運行結果
通過運行結果可以看出,生產者沒有手動發送結束信號給消費者,而是通過JoinableQueue隊列的方式也實現了生產者消費者模型。


[ljg58026 ] python多進程下的生產者和消費者模型已經有260次圍觀

http://coctec.com/docs/python/shhow-post-233290.html