歡迎您光臨本站 註冊首頁

Python併發concurrent.futures和asyncio實例

←手機掃碼閱讀     retouched @ 2020-05-04 , reply:0

從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,concurrent.futures 模塊的主要特色是 ThreadPoolExecutor 和ProcessPoolExecutor 類,這兩個類實現的接口能分別在不同的線程或進程中執行可調用的對象。這兩個類在內部維護著一個工作線程或進程池,以及要執行的任務隊列。Python 3.4 以後標準庫中asyncio 包,這個包使用事件循環驅動的協程實現併發。這是 Python 中最大也是最具雄心壯志的庫之一。asyncio 大量使用 yield from 表達式,因此與Python 舊版不兼容。submit和map方法submit方法作用是向線程池提交可回調的task,並返回一個回調實例。

example:

import time from concurrent.futures 

import ThreadPoolExecutor 

# 可回調的

task def pub_task(msg): time.sleep(3) return msg 

# 創建一個線程池 

pool = ThreadPoolExecutor(max_workers=3) 

# 往線程池加入2個

task task1 = pool.submit(pub_task, 'a')

 task2 = pool.submit(pub_task, 'b') 

print(task1.done()) 

# False time.sleep(4) 

print(task2.done()) 

# True print(task1.result()) 

print(task2.result())

map方法是創建一個迭代器,回調的結果有序放在迭代器中。問題:Executor.map 函數易於使用,不過有個特性可能有用,也可能沒用,具體情況取決於需求:這個函數返回結果的順序與調用開始的順序一致。如果第一個調用生成結果用時 10秒,而其他調用只用 1 秒,代碼會阻塞 10 秒,獲取 map 方法返回的生成器產出的第一個結果。在此之後,獲取後續結果時不會阻塞,因為後續的調用已經結束。如果必須等到獲取所有結果後再處理,這種行為沒問題;不過,通常更可取的方式是,不管提交的順序,只要有結果就獲取。為此,要把 Executor.submit 方法和 futures.as_completed 函數結合起來使用。from concurrent.futures import ThreadPoolExecutor import requests URLS = ['http://www.csdn.com', 'http://qq.com', 'http://www.leasonlove.cn'] def task(url, timeout=10): return requests.get(url, timeout=timeout) pool = ThreadPoolExecutor(max_workers=3) results = pool.map(task, URLS) for ret in results: print('%s, %s' % (ret.url, ret))future異步編程Future可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed import requests URLS = ['http://www.csdn.cn', 'http://qq.com', 'http://www.leasonlove.cn'] def task(url, timeout=1): return requests.get(url, timeout=timeout) with ThreadPoolExecutor(max_workers=3) as executor: future_tasks = [executor.submit(task, url) for url in URLS] for f in future_tasks: if f.running(): print('%s is running' % str(f)) for f in as_completed(future_tasks): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, f_ret.content)) except Exception as e: # 第一個url無響應 f.cancel() print(str(e))asyncio庫協程實現併發對於gevent 和 asyncio 建議大家放棄Gevent,擁抱asyncio,asyncio是Python3.4以後標準庫。而且由於Gevent直接修改標準庫裡面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。但是我們無法保證你在複雜的生產環境中有哪些地方使用這些標準庫會由於打了補丁而出現奇怪的問題。import asyncio import time start = time.time() async def do(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Finish after {}s'.format(x) task1 = do(1) task2 = do(2) task3 = do(4) tasks = [ asyncio.ensure_future(task1), asyncio.ensure_future(task2), asyncio.ensure_future(task3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task result: ', task.result()) end = time.time() print('TIME: ', end - start)協程與線程如果使用線程做過重要的編程,你就知道寫出程序有多麼困難,因為調度程序任何時候都能中斷線程。必須記住保留鎖,去保護程序中的重要部分,防止多步操作在執行的過程中中斷,防止數據處於無效狀態。而協程默認會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程序的餘下部分運行。對協程來說,無需保留鎖,在多個線程之間同步操作,協程自身就會同步,因為在任意時刻只有一個協程運行。想交出控制權時,可以使用 yield 或 yield from 把控制權交還調度程序。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執行清理操作。補充知識:Python-什麼時候使用yield?簡介很多時候在python代碼中見到了yield,沒有系統學習過,自己也沒有用過。yield語句延遲了語句的執行,然後發送了一個值給調用者,但保留了一定的狀態去保證函數離開之後可以繼續。當繼續的時候,函數繼續執行上一個的運行狀態。這使得它的代碼可以隨著時間產生一系列的值,而不是立即執行,然後像一個list一樣發送他們回來。例子例子1:# A Simple Python program to demonstrate working # of yield # A generator function that yields 1 for first time, # 2 second time and 3 third time def simpleGeneratorFun(): yield 1 yield 2 yield 3 # Driver code to check above generator function for value in simpleGeneratorFun(): print(value)

返回語句發送一個特殊的值給它的調用者,而yield產生了一系列的值,當我們想要遍歷一個序列的時候,我們應該使用yield,但不想要把整個序列存儲在內存中。yield用於python的生成器(generator)。一個genertator 被定義得看起來像一個普通函數一樣,但它需要產生一個數字得時候,它使用yield,而不是使用return。如果一個函數裡面定義了yield,那麼它自動稱為了一個generator函數。


例子2:

# A Python program to generate squares from 1 # to 100 using yield and therefore generator # An infinite generator function that prints # next square number. It starts with 1 def nextSquare(): i = 1; # An Infinite loop to generate squares while True: yield i*i i += 1 # Next execution resumes # from this point # Driver code to test above generator # function for num in nextSquare(): if num > 100: break print(num)輸出1,4,9…100


[retouched ] Python併發concurrent.futures和asyncio實例已經有325次圍觀

http://coctec.com/docs/python/shhow-post-232873.html