本文為 Python API 優化基礎 系列文,第 4 篇:
- 什麼是 asyncio?——Python 的非同步編程核心
- 高效能快取解決方案——深入解析 AioCache 套件
- 執行緒(Thread)是什麼?完整解析多執行緒與進程的差異與應用
- 完整解析 ThreadPoolExecutor:Python 高效併發工具 👈進度
- CPU 運算 vs. I/O 操作:執行緒與進程的最佳實踐
- ThreadPoolExecutor vs. asyncio:完整解析與實戰示例
在 Python 開發中,當我們需要同時執行多個任務時,ThreadPoolExecutor 是一個簡單且高效的工具。
它可以管理多個執行緒(Threads),讓程式能夠更快地處理大量 I/O 操作,如 API 請求、檔案讀取、網路爬取等。
ThreadPoolExecutor 是什麼?
ThreadPoolExecutor 是 Python concurrent.futures 模組的一部分,它提供了一種管理執行緒池(Thread Pool)的方式。
ThreadPoolExecutor 的作用
當我們想要執行多個執行緒時,有兩種方式:
- 手動建立執行緒(使用
threading.Thread) - 使用
ThreadPoolExecutor來自動管理執行緒
舉例來說,假設我們有 10 個 API 請求需要發送:
- 單執行緒:依序執行 10 次 API 請求,總時間較長。
- 多執行緒(Threading):可以同時發送 10 個 API 請求,加快速度。
- 使用
ThreadPoolExecutor:讓 Python 幫我們自動管理執行緒,提高效能且降低複雜度。
ThreadPoolExecutor 的基本用法
讓我們從最基本的例子開始,看看如何使用 ThreadPoolExecutor 來執行多個任務。
建立執行緒池
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
""" 模擬一個需要 2 秒的工作 """
print(f"🔄 任務 {n} 開始執行")
time.sleep(2)
print(f"✅ 任務 {n} 完成")
# 建立一個執行緒池,最多允許 3 個執行緒同時運行
with ThreadPoolExecutor(max_workers=3) as executor:
executor.map(task, range(5)) # 執行 5 個任務
輸出結果
🔄 任務 0 開始執行
🔄 任務 1 開始執行
🔄 任務 2 開始執行
✅ 任務 0 完成
✅ 任務 1 完成
✅ 任務 2 完成
🔄 任務 3 開始執行
🔄 任務 4 開始執行
✅ 任務 3 完成
✅ 任務 4 完成
說明
max_workers=3:最多同時執行 3 個任務,當有執行緒完成時,下一個任務才會開始執行。executor.map(task, range(5)):將task()應用到 0~4 共 5 個任務,但最多 3 個同時執行。
submit() vs. map():ThreadPoolExecutor 任務提交方式解析
在 ThreadPoolExecutor 中,我們可以使用 submit() 或 map() 來提交任務,兩者在行為上有所不同,各有適用的場景。
submit() vs. map() 差異總覽
| 功能 | submit() | map() |
|---|---|---|
| 提交方式 | 逐個提交(單次呼叫處理單一任務) | 批量提交(單次呼叫處理多個任務) |
| 回傳值 | Future 物件(可手動獲取結果) | 直接回傳結果(類似 map() 函式) |
| 適用場景 | 需要逐步處理結果,如異常處理、錯誤重試 | 單輸入對應單輸出的批量處理,如數據運算 |
| 是否可以逐步獲取結果 | 可以(每個 Future 可單獨 .result()) | 不可以(需等待所有結果計算完畢後才返回) |
| 錯誤處理 | 更靈活,可在 Future 物件中捕捉錯誤 | 所有任務會同時執行,無法單獨處理錯誤 |
submit():逐個提交任務,回傳 Future 物件
📌 何時使用 submit()?
- 適合需要異步獲取結果的場景(例如某些任務可能比較慢,可以先處理快的)。
- 適合需要錯誤處理的場景(如果某些任務失敗,我們可以只重試這些失敗的任務)。
- 適合需要動態控制執行緒池內部的工作量(不像
map()一次提交所有任務)。
📍 使用 submit() 提交多個任務
from concurrent.futures import ThreadPoolExecutor
def square(n):
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(square, 2)
future2 = executor.submit(square, 3)
print(future1.result()) # 4
print(future2.result()) # 9
📌 解析
executor.submit(square, 2)提交square(2)任務,回傳Future物件。future1.result()取得square(2)的計算結果。submit()不會等所有任務完成才返回,可以單獨處理每個Future的結果。
📍 submit() 處理錯誤示範
def divide(n, d):
if d == 0:
raise ValueError("❌ 除數不能為 0")
return n / d
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(divide, 10, i) for i in range(3)] # 除以 0 會報錯
for future in futures:
try:
print(future.result()) # 嘗試取得結果
except Exception as e:
print(f"⚠️ 錯誤: {e}") # 捕捉異常
📌 輸出
⚠️ 錯誤: ❌ 除數不能為 0
5.0
10.0
📌 為什麼 submit() 適合處理錯誤?
submit()允許我們個別處理每個任務的錯誤,而不影響其他成功的任務。map()無法捕捉單一錯誤,如果有一個任務失敗,整個map()會失敗。
map():批量提交任務,直接回傳結果
📌 何時使用 map()?
- 適合所有任務的執行時間接近,不需要個別處理回傳值。
- 適合單輸入對應單輸出(One-to-One) 的場景,如數值計算、資料清理。
- 適合所有任務都能成功執行,不需要錯誤處理(如果一個任務失敗,整個
map()會中斷)。
📍 使用 map() 一次提交多個任務
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(square, [1, 2, 3, 4, 5])
print(list(results)) # [1, 4, 9, 16, 25]
📌 解析
executor.map(square, [1, 2, 3, 4, 5])會同時提交 5 個任務。- 不需要
.result(),直接返回結果列表。 map()會等待所有任務完成後才返回結果,因此不能逐步取得結果。
📍 map() 錯誤處理(會導致整個 map() 失敗)
with ThreadPoolExecutor(max_workers=3) as executor:
try:
results = executor.map(divide, [10, 10, 10], [1, 0, 2]) # 除數有 0
print(list(results)) # 這行不會執行,因為 `map()` 會在錯誤發生時中斷
except Exception as e:
print(f"⚠️ map() 發生錯誤: {e}")
📌 輸出
⚠️ map() 發生錯誤: ❌ 除數不能為 0📌 為什麼 map() 不適合錯誤處理?
map()不允許個別處理錯誤,如果一個任務失敗,整個map()會終止。- 若需要處理錯誤,應使用
submit()來逐步提交並捕捉錯誤。
submit() vs. map():哪種方式適合你?
| 功能 | submit() | map() |
|---|---|---|
| 提交方式 | 逐個提交,可手動控制 | 批量提交 |
| 回傳值 | Future 物件(需 .result() 取得) | 直接回傳結果列表 |
| 適合場景 | 需要逐步取得結果、錯誤處理 | 一次性計算多個輸入,無需個別處理 |
| 錯誤處理 | 允許個別處理錯誤 | 若一個任務失敗,整個 map() 會失敗 |
| 彈性 | 高,可個別控制任務 | 低,所有任務一起執行 |
submit() vs. map() 實戰場景
| 應用場景 | 推薦方法 | 原因 |
|---|---|---|
| API 請求(如 OpenAI API) | submit() | 每個 API 請求時間不固定,需單獨處理結果 |
| 網頁爬蟲(Web Scraping) | submit() | 允許部分網頁失敗時,仍繼續執行 |
| 數據計算(如平方運算) | map() | 所有運算時間一致,適合批量計算 |
| 背景任務(如記錄日誌) | submit() | 可逐步處理並寫入檔案 |
ThreadPoolExecutor 的應用場景
ThreadPoolExecutor 在 Python 的 I/O 密集型(I/O-Bound)工作 中非常實用,它允許我們同時執行多個執行緒,提高處理效率。
以下是幾個適合使用 ThreadPoolExecutor 的情境:
ThreadPoolExecutor 適用的場景
| 應用場景 | 為什麼適合使用 ThreadPoolExecutor? | 示例應用 |
|---|---|---|
| API 請求(如 OpenAI API) | 允許我們同時發送多個 API 請求,提高處理效率,避免單執行緒等待。 | – 解析大量用戶評論並發送給 OpenAI API 優化文案。- 同時請求多個第三方 API 來獲取數據(如天氣查詢、多個新聞來源)。 |
| 網路爬蟲(Web Scraping) | 在爬取網頁時,許多時間都花在等待伺服器回應,多執行緒能讓我們同時爬取多個網頁,加快速度。 | – 從多個新聞網站爬取文章內容。- 批量下載圖片、影片或 PDF 文件。 |
| 檔案 I/O 處理(檔案讀寫) | 當我們需要讀取或寫入大量檔案時,透過多執行緒可以同時處理多個文件,減少等待時間。 | – 讀取多個大型日誌文件,並過濾出錯誤訊息。- 批量處理 CSV、JSON 檔案,進行格式轉換或數據清理。 |
| 背景任務(Background Tasks) | 讓程式的主執行緒可以專注於主要任務,而將次要任務交給 ThreadPoolExecutor 執行,避免 UI 或主流程卡住。 | – 遊戲內的數據同步(如排行榜更新)。- UI 應用程式執行自動備份,不影響使用者操作。 |
ThreadPoolExecutor 應用實戰示例
🔸 同時發送多個 API 請求
from concurrent.futures import ThreadPoolExecutor
import requests
API_URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3"
]
def fetch_data(url):
response = requests.get(url)
return response.json()
# 使用 ThreadPoolExecutor 同時請求多個 API
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_data, API_URLS))
print(results) # 取得三個 API 回應的 JSON 數據
📌 這樣的做法比單執行緒依序請求快上許多,特別是在需要等待 API 回應的情況下。
🔸 爬取多個網頁內容
from concurrent.futures import ThreadPoolExecutor
import requests
URLS = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
def scrape_page(url):
response = requests.get(url)
return f"{url} 爬取完成,狀態碼: {response.status_code}"
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(scrape_page, URLS))
print("\n".join(results))
📌 多執行緒讓爬蟲能夠同時發送多個請求,減少等待時間,加快爬取效率。
🔸 批量處理檔案
from concurrent.futures import ThreadPoolExecutor
import os
def read_file(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
files = ["file1.txt", "file2.txt", "file3.txt"]
with ThreadPoolExecutor(max_workers=3) as executor:
contents = list(executor.map(read_file, files))
print(contents) # 讀取多個文件的內容
📌 當我們需要同時讀取或寫入多個檔案時,多執行緒可以顯著提升效率。
ThreadPoolExecutor vs. ProcessPoolExecutor
Python 也提供 ProcessPoolExecutor 來處理 CPU 密集型(CPU-Bound) 工作。兩者的主要區別如下:
ThreadPoolExecutor vs. ProcessPoolExecutor 比較
| 特性 | ThreadPoolExecutor | ProcessPoolExecutor |
|---|---|---|
| 適用場景 | I/O 密集型(API、爬蟲、檔案讀寫) | CPU 密集型(數據計算、影像處理) |
| 資源使用 | 多個 執行緒(Threads),共享記憶體 | 多個 進程(Processes),獨立記憶體 |
| 受 GIL 限制? | 是(適用於 I/O 操作) | 否(適用於 CPU 計算) |
| 建立開銷 | 低(執行緒切換較快) | 高(需要額外的記憶體空間與 IPC 通訊) |
| 適用例子 | 同時發送 API 請求、讀取多個檔案、爬取網頁 | 平行數據計算、機器學習訓練、大型矩陣運算 |
📌 結論:
- 如果你的工作主要是等待(例如 API、爬蟲、檔案讀寫)➡ 使用
ThreadPoolExecutor - 如果你的工作需要大量計算(例如影像處理、數據分析)➡ 使用
ProcessPoolExecutor
ProcessPoolExecutor 適用場景示例
🔸 CPU 密集型工作
from concurrent.futures import ProcessPoolExecutor
def heavy_computation(n):
return sum(i**2 for i in range(n))
numbers = [1000000, 2000000, 3000000]
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(heavy_computation, numbers))
print(results) # 處理三個大數據計算任務
📌 如果我們用 ThreadPoolExecutor,GIL 會限制 Python 只能一次運行一個執行緒,因此效果不佳。
最佳實踐與潛在問題
最佳實踐
- 控制
max_workers避免過多執行緒- 設定
max_workers不宜過高,通常為CPU 核心數 * 2。
- 設定
- 使用
submit()處理回傳值- 如果需要獲取任務結果,
submit()會比map()更靈活。
- 如果需要獲取任務結果,
- 確保使用
with語法with ThreadPoolExecutor() as executor:會自動關閉執行緒池,避免資源浪費。
潛在問題
| 問題 | 解決方案 |
|---|---|
| 大量執行緒併發,導致記憶體耗盡 | 適當限制 max_workers |
| 共享變數時發生 Race Condition | 使用 threading.Lock() 保護變數 |
| 網路請求異常(如超時) | 使用 try-except 捕捉錯誤 |
總結
✅ ThreadPoolExecutor 提供簡單高效的方式來執行多個執行緒
✅ 適合用於I/O 密集型任務(API 請求、網頁爬取、檔案處理)
✅ max_workers 控制同時執行的執行緒數量,避免資源耗盡
✅ 對 CPU 密集型任務,應改用 ProcessPoolExecutor 🚀