本文為 Python API 優化基礎 系列文,第 4 篇:
- 什麼是 asyncio?——Python 的非同步編程核心
- 高效能快取解決方案——深入解析 AioCache 套件
- 執行緒(Thread)是什麼?完整解析多執行緒與進程的差異與應用
- 完整解析 ThreadPoolExecutor:Python 高效併發工具 👈進度
- CPU 運算 vs. I/O 操作:執行緒與進程的最佳實踐
- ThreadPoolExecutor vs. asyncio:完整解析與實戰示例
在 Python 開發中,當我們需要同時執行多個任務時,ThreadPoolExecutor 是一個簡單且高效的工具。
它可以管理多個執行緒(Threads),讓程式能夠更快地處理大量 I/O 操作,如 API 請求、檔案讀取、網路爬取等。
ThreadPoolExecutor 是什麼?
ThreadPoolExecutor 是 Python concurrent.futures 模組的一部分,它提供了一種管理執行緒池(Thread Pool)的方式。
ThreadPoolExecutor 的作用
當我們想要執行多個執行緒時,有兩種方式:
- 手動建立執行緒(使用
threading.Thread) - 使用
ThreadPoolExecutor來自動管理執行緒
舉例來說,假設我們有 10 個 API 請求需要發送:
- 單執行緒:依序執行 10 次 API 請求,總時間較長。
- 多執行緒(Threading):可以同時發送 10 個 API 請求,加快速度。
- 使用
ThreadPoolExecutor:讓 Python 幫我們自動管理執行緒,提高效能且降低複雜度。
ThreadPoolExecutor 的基本用法
讓我們從最基本的例子開始,看看如何使用 ThreadPoolExecutor 來執行多個任務。
建立執行緒池
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
""" 模擬一個需要 2 秒的工作 """
print(f"🔄 任務 {n} 開始執行")
time.sleep(2)
print(f"✅ 任務 {n} 完成")
# 建立一個執行緒池,最多允許 3 個執行緒同時運行
with ThreadPoolExecutor(max_workers=3) as executor:
executor.map(task, range(5)) # 執行 5 個任務
輸出結果
🔄 任務 0 開始執行
🔄 任務 1 開始執行
🔄 任務 2 開始執行
✅ 任務 0 完成
✅ 任務 1 完成
✅ 任務 2 完成
🔄 任務 3 開始執行
🔄 任務 4 開始執行
✅ 任務 3 完成
✅ 任務 4 完成
說明
max_workers=3:最多同時執行 3 個任務,當有執行緒完成時,下一個任務才會開始執行。executor.map(task, range(5)):將task()應用到 0~4 共 5 個任務,但最多 3 個同時執行。
submit() vs. map():ThreadPoolExecutor 任務提交方式解析
在 ThreadPoolExecutor 中,我們可以使用 submit() 或 map() 來提交任務,兩者在行為上有所不同,各有適用的場景。
submit() vs. map() 差異總覽
submit():逐個提交任務,回傳 Future 物件
📌 何時使用 submit()?
- 適合需要異步獲取結果的場景(例如某些任務可能比較慢,可以先處理快的)。
- 適合需要錯誤處理的場景(如果某些任務失敗,我們可以只重試這些失敗的任務)。
- 適合需要動態控制執行緒池內部的工作量(不像
map()一次提交所有任務)。
📍 使用 submit() 提交多個任務
from concurrent.futures import ThreadPoolExecutor
def square(n):
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(square, 2)
future2 = executor.submit(square, 3)
print(future1.result()) # 4
print(future2.result()) # 9
📌 解析
executor.submit(square, 2)提交square(2)任務,回傳Future物件。future1.result()取得square(2)的計算結果。submit()不會等所有任務完成才返回,可以單獨處理每個Future的結果。
📍 submit() 處理錯誤示範
def divide(n, d):
if d == 0:
raise ValueError("❌ 除數不能為 0")
return n / d
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(divide, 10, i) for i in range(3)] # 除以 0 會報錯
for future in futures:
try:
print(future.result()) # 嘗試取得結果
except Exception as e:
print(f"⚠️ 錯誤: {e}") # 捕捉異常
📌 輸出
⚠️ 錯誤: ❌ 除數不能為 0
5.0
10.0
📌 為什麼 submit() 適合處理錯誤?
submit()允許我們個別處理每個任務的錯誤,而不影響其他成功的任務。map()無法捕捉單一錯誤,如果有一個任務失敗,整個map()會失敗。
map():批量提交任務,直接回傳結果
📌 何時使用 map()?
- 適合所有任務的執行時間接近,不需要個別處理回傳值。
- 適合單輸入對應單輸出(One-to-One) 的場景,如數值計算、資料清理。
- 適合所有任務都能成功執行,不需要錯誤處理(如果一個任務失敗,整個
map()會中斷)。
📍 使用 map() 一次提交多個任務
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(square, [1, 2, 3, 4, 5])
print(list(results)) # [1, 4, 9, 16, 25]
📌 解析
executor.map(square, [1, 2, 3, 4, 5])會同時提交 5 個任務。- 不需要
.result(),直接返回結果列表。 map()會等待所有任務完成後才返回結果,因此不能逐步取得結果。
📍 map() 錯誤處理(會導致整個 map() 失敗)
with ThreadPoolExecutor(max_workers=3) as executor:
try:
results = executor.map(divide, [10, 10, 10], [1, 0, 2]) # 除數有 0
print(list(results)) # 這行不會執行,因為 `map()` 會在錯誤發生時中斷
except Exception as e:
print(f"⚠️ map() 發生錯誤: {e}")
📌 輸出
⚠️ map() 發生錯誤: ❌ 除數不能為 0📌 為什麼 map() 不適合錯誤處理?
map()不允許個別處理錯誤,如果一個任務失敗,整個map()會終止。- 若需要處理錯誤,應使用
submit()來逐步提交並捕捉錯誤。
submit() vs. map():哪種方式適合你?
submit() vs. map() 實戰場景
ThreadPoolExecutor 的應用場景
ThreadPoolExecutor 在 Python 的 I/O 密集型(I/O-Bound)工作 中非常實用,它允許我們同時執行多個執行緒,提高處理效率。
以下是幾個適合使用 ThreadPoolExecutor 的情境:
ThreadPoolExecutor 適用的場景
ThreadPoolExecutor 應用實戰示例
🔸 同時發送多個 API 請求
from concurrent.futures import ThreadPoolExecutor
import requests
API_URLS = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3"
]
def fetch_data(url):
response = requests.get(url)
return response.json()
# 使用 ThreadPoolExecutor 同時請求多個 API
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_data, API_URLS))
print(results) # 取得三個 API 回應的 JSON 數據
📌 這樣的做法比單執行緒依序請求快上許多,特別是在需要等待 API 回應的情況下。
🔸 爬取多個網頁內容
from concurrent.futures import ThreadPoolExecutor
import requests
URLS = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
def scrape_page(url):
response = requests.get(url)
return f"{url} 爬取完成,狀態碼: {response.status_code}"
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(scrape_page, URLS))
print("\n".join(results))
📌 多執行緒讓爬蟲能夠同時發送多個請求,減少等待時間,加快爬取效率。
🔸 批量處理檔案
from concurrent.futures import ThreadPoolExecutor
import os
def read_file(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
files = ["file1.txt", "file2.txt", "file3.txt"]
with ThreadPoolExecutor(max_workers=3) as executor:
contents = list(executor.map(read_file, files))
print(contents) # 讀取多個文件的內容
📌 當我們需要同時讀取或寫入多個檔案時,多執行緒可以顯著提升效率。
ThreadPoolExecutor vs. ProcessPoolExecutor
Python 也提供 ProcessPoolExecutor 來處理 CPU 密集型(CPU-Bound) 工作。兩者的主要區別如下:
ThreadPoolExecutor vs. ProcessPoolExecutor 比較
📌 結論:
- 如果你的工作主要是等待(例如 API、爬蟲、檔案讀寫)➡ 使用
ThreadPoolExecutor - 如果你的工作需要大量計算(例如影像處理、數據分析)➡ 使用
ProcessPoolExecutor
ProcessPoolExecutor 適用場景示例
🔸 CPU 密集型工作
from concurrent.futures import ProcessPoolExecutor
def heavy_computation(n):
return sum(i**2 for i in range(n))
numbers = [1000000, 2000000, 3000000]
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(heavy_computation, numbers))
print(results) # 處理三個大數據計算任務
📌 如果我們用 ThreadPoolExecutor,GIL 會限制 Python 只能一次運行一個執行緒,因此效果不佳。
最佳實踐與潛在問題
最佳實踐
- 控制
max_workers避免過多執行緒- 設定
max_workers不宜過高,通常為CPU 核心數 * 2。
- 設定
- 使用
submit()處理回傳值- 如果需要獲取任務結果,
submit()會比map()更靈活。
- 如果需要獲取任務結果,
- 確保使用
with語法with ThreadPoolExecutor() as executor:會自動關閉執行緒池,避免資源浪費。
潛在問題
總結
✅ ThreadPoolExecutor 提供簡單高效的方式來執行多個執行緒
✅ 適合用於I/O 密集型任務(API 請求、網頁爬取、檔案處理)
✅ max_workers 控制同時執行的執行緒數量,避免資源耗盡
✅ 對 CPU 密集型任務,應改用 ProcessPoolExecutor 🚀