ThreadPoolExecutor vs. asyncio:完整解析與實戰示例
更新日期: 2025 年 2 月 13 日
本文為 Python API 優化基礎 系列文,第 6 篇:
- 什麼是 asyncio?——Python 的非同步編程核心
- 高效能快取解決方案——深入解析 AioCache 套件
- 執行緒(Thread)是什麼?完整解析多執行緒與進程的差異與應用
- 完整解析 ThreadPoolExecutor:Python 高效併發工具
- CPU 運算 vs. I/O 操作:執行緒與進程的最佳實踐
- ThreadPoolExecutor vs. asyncio:完整解析與實戰示例 👈進度
在 Python 中,我們有兩種方式來提高 I/O 操作的效率:
- 多執行緒(Threading): 使用
ThreadPoolExecutor
來並行執行同步函式(如requests
)。 - 非同步(Asynchronous): 使用
asyncio
來非同步執行支援async
的函式(如aiohttp
)。
這兩者的核心概念不同,許多開發者容易混淆:
ThreadPoolExecutor
使用多執行緒來並行運行同步函式asyncio
使用單執行緒的事件迴圈來切換非同步函式
ThreadPoolExecutor
(多執行緒)
什麼是多執行緒(Threading)?
執行緒(Thread) 是程式執行的最小單位,透過多執行緒(Multithreading),我們可以同時執行多個任務,提高效率。
ThreadPoolExecutor
如何運作?
- 主要目標:在同步環境中,提高 I/O 操作的執行效率。
- 工作方式:使用多個執行緒同時執行不同的任務,讓程式看起來「同時」處理多個請求。
- 適用場景:
- 多個 API 請求(使用
requests
) - 批量讀取或寫入檔案
- 與資料庫進行多個查詢
- 多個 API 請求(使用
ThreadPoolExecutor
示例
📍 讓同步 requests
並行運行
import requests
from concurrent.futures import ThreadPoolExecutor
URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]
def fetch(url):
"""發送 HTTP 請求"""
response = requests.get(url)
return response.json()
# 使用 ThreadPoolExecutor 讓多個請求同時執行
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch, URLS))
print(results)
詳細解析
當我們使用 requests.get(url)
發送 API 請求時,它是一個「同步(Blocking)操作」,這意味著:
- 當
requests.get(url)
被呼叫時,程式會停下來等待伺服器回應,直到收到完整的回應後,才會繼續執行下一行程式碼。 - 如果我們需要發送多個請求,預設情況下,它們會一個接一個依序執行,導致效率低落。
為了解決這個問題,我們可以使用 ThreadPoolExecutor
,透過「多執行緒(Threading)」來並行發送多個 API 請求,提高執行效率。
📍requests.get(url)
是同步(Blocking)操作
我們先看一個單執行緒發送 API 請求的範例:
import requests
import time
URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]
def fetch(url):
print(f"🔄 發送請求: {url}")
response = requests.get(url) # 這是一個同步操作,程式會卡在這裡
print(f"✅ 完成請求: {url}")
return response.json()
start_time = time.time()
results = [fetch(url) for url in URLS] # 依序發送 3 個請求
end_time = time.time()
print(f"總執行時間: {end_time - start_time:.2f} 秒")
requests.get(url)
一次只能發送一個請求,程式必須等這個請求完成後,才能發送下一個請求。- 如果每個請求都需要 1 秒,那麼總執行時間至少 3 秒。
- 問題:如果我們有 1000 個 API 請求,這樣的方式會非常慢。
📍ThreadPoolExecutor
如何加速 API 請求?
為了解決「同步操作導致等待時間過長」的問題,我們可以使用 ThreadPoolExecutor
,透過多執行緒(Threading) 讓程式同時發送多個請求。
ThreadPoolExecutor
讓 requests
同時發送多個請求
import requests
import time
from concurrent.futures import ThreadPoolExecutor
URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]
def fetch(url):
print(f"🔄 發送請求: {url}")
response = requests.get(url) # 這是一個同步操作,但透過 ThreadPoolExecutor 讓它並行執行
print(f"✅ 完成請求: {url}")
return response.json()
start_time = time.time()
# 使用 ThreadPoolExecutor 讓 3 個請求同時執行
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch, URLS))
end_time = time.time()
print(f"總執行時間: {end_time - start_time:.2f} 秒")
ThreadPoolExecutor(max_workers=3)
開 3 個執行緒- 這意味著 Python 會同時啟動 3 個執行緒 來執行
fetch(url)
函式。 - 每個執行緒負責 1 個 API 請求,不需要等其他請求完成才能開始新的請求。
- 這意味著 Python 會同時啟動 3 個執行緒 來執行
executor.map(fetch, URLS)
讓多個請求同時執行executor.map()
會自動把URLS
中的每個網址 分配給執行緒,讓它們並行發送請求。- 這樣,我們的 3 個請求可以「同時發送」,不需要等上一個請求完成後才開始下一個。
- 執行時間大幅縮短
- 在「單執行緒」的情況下,3 個請求需要 3 秒(1 秒/請求)。
- 使用
ThreadPoolExecutor
,3 個請求可以同時執行,總時間約 1 秒。 - 節省了 2/3 的時間,提高了 3 倍效能!
📍ThreadPoolExecutor
vs. 單執行緒對比
方法 | 執行方式 | 總執行時間(假設每個請求 1 秒) |
---|---|---|
單執行緒(同步) | 逐個請求,等待回應後才發送下一個 | ⏳ 3 秒 |
ThreadPoolExecutor(多執行緒) | 同時發送 3 個請求 | 🚀 1 秒 |
這就是 ThreadPoolExecutor
的強大之處!
它不會讓 requests.get(url)
變成非同步函式,但它透過多執行緒讓多個同步函式同時執行,大幅提升效能。
asyncio
(非同步事件驅動)
什麼是非同步(Asynchronous)?
非同步(Asynchronous) 是一種事件驅動(Event-Driven) 模型,允許我們在等待 I/O 操作時切換任務,而不是讓 CPU 閒置。
asyncio
如何運作?
- 主要目標:在單一執行緒內,同時執行多個 I/O 任務,而不會互相阻塞。
- 工作方式:
asyncio
使用事件迴圈(Event Loop) 來管理非同步函式(coroutines)。- 當一個函式遇到
await
時,程式會「暫停」該函式,然後去執行其他任務。 asyncio
不會自動使用多執行緒或多進程,它完全依賴單一執行緒 + 任務切換。
asyncio
示例
📍 讓 aiohttp
非同步發送 API 請求
import aiohttp
import asyncio
URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]
async def fetch(session, url):
"""使用 aiohttp 非同步發送請求"""
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in URLS]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
詳細解析
當我們使用 asyncio
搭配 aiohttp
來發送 HTTP 請求時,與傳統的 requests
模組有很大不同。讓我們逐步拆解這段程式碼的運作方式。
📍 aiohttp
是 asyncio
兼容的 HTTP 請求庫
在 Python 中,最常見的 HTTP 請求庫是 requests
,但 requests.get(url)
是同步(Blocking)操作,會阻塞整個程式,直到伺服器回應後才能繼續執行。
aiohttp
則是一個專為 asyncio
設計的非同步 HTTP 庫,它允許我們使用 async
和 await
來非同步發送 HTTP 請求,而不會卡住程式。
🔹requests
(同步,會阻塞)
import requests
def fetch(url):
response = requests.get(url)
return response.json()
print(fetch("https://jsonplaceholder.typicode.com/todos/1")) # 這行會等待請求完成
問題:
requests.get(url)
是同步的,請求發送後必須等伺服器回應,程式才會繼續執行。- 如果有多個 API 請求,這些請求會「依序執行」,導致效率低落。
🔹 aiohttp
(非同步,不會阻塞)
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
print(asyncio.run(fetch("https://jsonplaceholder.typicode.com/todos/1")))
為什麼 aiohttp
更好?
async with aiohttp.ClientSession()
允許在非同步環境下建立 HTTP 連線。await response.json()
不會阻塞其他任務,當fetch()
等待回應時,事件迴圈可以執行其他任務。- 適合大量 API 請求,不會一個一個等待,讓程式變得更高效。
📍 asyncio.gather()
讓所有請求「同時發送」,而不會等待彼此完成
通常在同步環境下,我們需要一個一個發送請求:
results = [fetch(url) for url in URLS] # 這會逐個發送請求
但 asyncio.gather()
允許我們一次性發送所有請求,並等待它們的結果。
🔹asyncio.gather()
並行處理
import aiohttp
import asyncio
URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]
async def fetch(session, url):
""" 非同步發送 HTTP 請求 """
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in URLS] # 建立任務列表
results = await asyncio.gather(*tasks) # 並行執行所有任務
print(results)
asyncio.run(main())
tasks = [fetch(session, url) for url in URLS]
- 建立一個「待執行的非同步函式列表」,但這些函式還沒執行。
await asyncio.gather(*tasks)
- 讓所有
fetch()
任務同時執行,不需要等一個請求完成後再發送下一個。 - 這與傳統同步程式的「一個一個請求」完全不同。
- 讓所有
🔹 asyncio.gather()
的關鍵優勢
方法 | 請求方式 | 效率 |
---|---|---|
同步(requests) | 逐個發送,每個請求都要等待前一個完成 | 慢 |
非同步 asyncio.gather() | 一次發送所有請求,不互相等待 | 快 |
這讓 Python 可以像 JavaScript
的 Promise.all()
一樣,並行執行多個請求!
📍ThreadPoolExecutor
傳統同步環境
在傳統的同步環境中,ThreadPoolExecutor
允許我們並行執行同步函式,例如 requests.get()
:
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch(url):
return requests.get(url).json()
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch, URLS))
問題:
ThreadPoolExecutor
是多執行緒技術,需要開啟多個執行緒來處理同步任務。requests.get(url)
仍然是同步函式,只是分配到不同的執行緒上執行,效能提升有限。- 不是真正的非同步運行!
🔹 aiohttp
+ asyncio
不需要 ThreadPoolExecutor
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in URLS]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
這樣的做法完全不需要 ThreadPoolExecutor
,因為 aiohttp
原生支援 asyncio
,能夠真正做到非同步處理。
ThreadPoolExecutor
vs. asyncio
的核心區別
特性 | ThreadPoolExecutor (多執行緒) | asyncio (非同步) |
---|---|---|
並行方式 | 多個執行緒 | 單執行緒 + 事件迴圈 |
適合場景 | 同步 I/O 任務(如 requests.get()、檔案讀寫) | 支援 async 的函式(如 aiohttp) |
是否受 GIL 限制 | 是(但 I/O 任務影響不大) | 否(非同步處理,不影響 GIL) |
是否適合 CPU 密集型運算 | ❌ 不適合 | ❌ 不適合 |
錯誤示範 | asyncio.run(ThreadPoolExecutor()) | asyncio.run(requests.get())(會卡住) |
asyncio
如何搭配 ThreadPoolExecutor
?
如果我們必須在 asyncio
內執行同步函式(如 requests.get()
),可以用 ThreadPoolExecutor
來轉換。
ThreadPoolExecutor
在 asyncio
中的角色
ThreadPoolExecutor
本質上是同步的執行緒池,但當它與 asyncio
搭配時,透過 loop.run_in_executor()
,我們可以讓同步函式以非同步的方式運行,避免阻塞 asyncio
。
ThreadPoolExecutor
在 asyncio
內的運作流程
sequenceDiagram participant EventLoop as asyncio 事件迴圈 participant ThreadPool as ThreadPoolExecutor (執行緒池) participant Worker1 as 執行緒 1 (執行同步函式) participant Worker2 as 執行緒 2 (執行同步函式) EventLoop->>ThreadPool: 使用 loop.run_in_executor() 提交同步函式 ThreadPool->>Worker1: 啟動執行緒 1 執行函式 ThreadPool->>Worker2: 啟動執行緒 2 執行函式 EventLoop->>EventLoop: 繼續執行其他 asyncio 任務 (非同步) Worker1-->>ThreadPool: 任務完成,返回結果 Worker2-->>ThreadPool: 任務完成,返回結果 ThreadPool-->>EventLoop: 回傳結果給 asyncio EventLoop->>EventLoop: 處理 run_in_executor() 的回傳值
這個流程發生了什麼?
asyncio
事件迴圈(EventLoop)啟動asyncio
使用loop.run_in_executor()
,將fetch()
提交給ThreadPoolExecutor
ThreadPoolExecutor
啟動新的執行緒Worker1
來執行fetch()
fetch()
呼叫requests.get(URL)
,向遠端 API 發送請求- API 回應 JSON,
Worker1
執行緒完成請求 Worker1
將結果回傳給ThreadPoolExecutor
ThreadPoolExecutor
再將結果回傳給asyncio
事件迴圈asyncio
事件迴圈繼續執行其他非同步任務,並處理fetch()
的回傳值
重點整理
當我們說 ThreadPoolExecutor 本質上是同步的,意思是它本來是為了 標準的、多執行緒的阻塞式(同步)程式設計 而設計的。
它允許你在背景執行某些任務,但這些任務本身仍然是 同步執行的,只是它們跑在不同的執行緒中,不會阻塞主執行緒。
但是,asyncio 是基於單執行緒的事件迴圈,主要用於 非同步 I/O 操作,它不會直接支援 ThreadPoolExecutor 這種基於執行緒的並行機制。
因此,asyncio 提供了 run_in_executor()
這個方法,讓開發者可以把 同步(阻塞)任務放進 ThreadPoolExecutor,並透過 await
讓它在事件迴圈內執行,這樣就能讓 同步的 ThreadPoolExecutor 變得「非同步友好」。
舉個例子:
假設有一個同步函式 blocking_task()
,它會執行一些耗時的操作:
import time
def blocking_task():
time.sleep(3) # 模擬一個同步的阻塞操作
return "Done"
這是一個標準的 同步函式,如果直接在 asyncio
內執行,它會阻塞整個事件迴圈。但我們可以用 run_in_executor()
把它丟進 ThreadPoolExecutor
來非同步執行:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, blocking_task)
print(result)
asyncio.run(main())
這裡 run_in_executor()
的作用就是:
- 把同步函式
blocking_task()
丟進ThreadPoolExecutor
執行,讓它在單獨的執行緒裡跑。 - 不阻塞
asyncio
的事件迴圈,因為await
讓asyncio
可以在等待結果的同時執行其他非同步任務。
asyncio
+ ThreadPoolExecutor
程式碼解析
現在,我們來看看如何在 asyncio
中使用 ThreadPoolExecutor
,讓同步函式不會影響 asyncio
的非同步運作。
import asyncio
import requests
from concurrent.futures import ThreadPoolExecutor
URL = "https://jsonplaceholder.typicode.com/todos/1"
def fetch():
"""🔹 這是一個同步函式"""
return requests.get(URL).json() # 使用 requests.get() 發送同步 HTTP 請求
async def main():
loop = asyncio.get_running_loop() # 取得目前的 asyncio 事件迴圈
with ThreadPoolExecutor() as executor: # 建立執行緒池
result = await loop.run_in_executor(executor, fetch) # 在執行緒池中執行同步函式
print(result)
asyncio.run(main())
📍逐步解析
🔹1. def fetch()
def fetch():
"""🔹 這是一個同步函式"""
return requests.get(URL).json() # 使用 requests.get() 發送同步 HTTP 請求
fetch()
是同步函式,執行時會阻塞執行緒:
requests.get(URL)
是同步操作,當執行時,程式會等待 API 回應後才繼續執行。- 如果
fetch()
直接在asyncio
內部執行,asyncio
會被卡住,無法處理其他任務。
🔹 2. loop = asyncio.get_running_loop()
loop = asyncio.get_running_loop()
asyncio.get_running_loop()
取得當前執行中的asyncio
事件迴圈。asyncio
事件迴圈負責管理所有async
任務,確保它們可以非同步執行。
🔹 3. 建立 ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
- 建立一個執行緒池(ThreadPoolExecutor),讓
fetch()
在額外的執行緒內執行。 - 避免
fetch()
直接執行在asyncio
事件迴圈內,導致卡住其他非同步任務。
🔹 4. loop.run_in_executor(executor, fetch)
result = await loop.run_in_executor(executor, fetch)
- 讓
fetch()
在ThreadPoolExecutor
內的執行緒執行。 asyncio
事件迴圈不會被fetch()
卡住,仍可執行其他async
任務。- 當
fetch()
完成時,await
會取得結果,並返回給result
變數。
這樣的做法讓同步函式 fetch()
在 asyncio
內部執行,而不會影響 asyncio
的非同步性。
🔹 5. 啟動 asyncio
asyncio.run(main())
- 啟動
asyncio
事件迴圈,執行main()
函式。 main()
是非同步函式,因此asyncio.run(main())
負責運行整個asyncio
非同步架構。
總結
✅ ThreadPoolExecutor
適合多執行緒並行執行同步函式,如 requests
或檔案讀寫。
✅ asyncio
適合處理 async
函式,如 aiohttp
,但不能直接執行同步函式。
✅ 可以用 ThreadPoolExecutor
讓同步函式在 asyncio
環境中運行,解決 asyncio
無法處理同步任務的問題。 🚀