完整解析 ThreadPoolExecutor:Python 高效併發工具

Published February 13, 2025 by 徐培鈞
Python

在 Python 開發中,當我們需要同時執行多個任務時,ThreadPoolExecutor 是一個簡單且高效的工具

它可以管理多個執行緒(Threads),讓程式能夠更快地處理大量 I/O 操作,如 API 請求、檔案讀取、網路爬取等。


ThreadPoolExecutor 是什麼?

ThreadPoolExecutor 是 Python concurrent.futures 模組的一部分,它提供了一種管理執行緒池(Thread Pool)的方式。

ThreadPoolExecutor 的作用

當我們想要執行多個執行緒時,有兩種方式:

  1. 手動建立執行緒(使用 threading.Thread
  2. 使用 ThreadPoolExecutor 來自動管理執行緒

舉例來說,假設我們有 10 個 API 請求需要發送:

  • 單執行緒:依序執行 10 次 API 請求,總時間較長。
  • 多執行緒(Threading):可以同時發送 10 個 API 請求,加快速度。
  • 使用 ThreadPoolExecutor:讓 Python 幫我們自動管理執行緒,提高效能且降低複雜度。

ThreadPoolExecutor 的基本用法

讓我們從最基本的例子開始,看看如何使用 ThreadPoolExecutor 來執行多個任務。

建立執行緒池

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    """ 模擬一個需要 2 秒的工作 """
    print(f"🔄 任務 {n} 開始執行")
    time.sleep(2)
    print(f"✅ 任務 {n} 完成")

# 建立一個執行緒池,最多允許 3 個執行緒同時運行
with ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(task, range(5))  # 執行 5 個任務

輸出結果

🔄 任務 0 開始執行
🔄 任務 1 開始執行
🔄 任務 2 開始執行
 任務 0 完成
 任務 1 完成
 任務 2 完成
🔄 任務 3 開始執行
🔄 任務 4 開始執行
 任務 3 完成
 任務 4 完成

說明

  • max_workers=3:最多同時執行 3 個任務,當有執行緒完成時,下一個任務才會開始執行。
  • executor.map(task, range(5)):將 task() 應用到 0~4 共 5 個任務但最多 3 個同時執行

submit() vs. map()ThreadPoolExecutor 任務提交方式解析

ThreadPoolExecutor 中,我們可以使用 submit()map() 來提交任務,兩者在行為上有所不同,各有適用的場景。

submit() vs. map() 差異總覽

submit()逐個提交(單次呼叫處理單一任務)
map()批量提交(單次呼叫處理多個任務)
submit()Future 物件(可手動獲取結果)
map()直接回傳結果(類似 map() 函式)
submit()需要逐步處理結果,如異常處理、錯誤重試
map()單輸入對應單輸出的批量處理,如數據運算
submit()可以(每個 Future 可單獨 .result())
map()不可以(需等待所有結果計算完畢後才返回)
submit()更靈活,可在 Future 物件中捕捉錯誤
map()所有任務會同時執行,無法單獨處理錯誤

submit():逐個提交任務,回傳 Future 物件

📌 何時使用 submit()

  • 適合需要異步獲取結果的場景(例如某些任務可能比較慢,可以先處理快的)。
  • 適合需要錯誤處理的場景(如果某些任務失敗,我們可以只重試這些失敗的任務)。
  • 適合需要動態控制執行緒池內部的工作量(不像 map() 一次提交所有任務)。

📍 使用 submit() 提交多個任務

from concurrent.futures import ThreadPoolExecutor

def square(n):
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(square, 2)
    future2 = executor.submit(square, 3)

    print(future1.result())  # 4
    print(future2.result())  # 9

📌 解析

  1. executor.submit(square, 2) 提交 square(2) 任務,回傳 Future 物件
  2. future1.result() 取得 square(2) 的計算結果。
  3. submit() 不會等所有任務完成才返回,可以單獨處理每個 Future 的結果。

📍 submit() 處理錯誤示範

def divide(n, d):
    if d == 0:
        raise ValueError("❌ 除數不能為 0")
    return n / d

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(divide, 10, i) for i in range(3)]  # 除以 0 會報錯

    for future in futures:
        try:
            print(future.result())  # 嘗試取得結果
        except Exception as e:
            print(f"⚠️ 錯誤: {e}")  # 捕捉異常

📌 輸出

⚠️ 錯誤:  除數不能為 0
5.0
10.0 

📌 為什麼 submit() 適合處理錯誤?

  • submit() 允許我們個別處理每個任務的錯誤,而不影響其他成功的任務。
  • map() 無法捕捉單一錯誤,如果有一個任務失敗,整個 map() 會失敗。

map():批量提交任務,直接回傳結果

📌 何時使用 map()

  • 適合所有任務的執行時間接近,不需要個別處理回傳值。
  • 適合單輸入對應單輸出(One-to-One) 的場景,如數值計算、資料清理。
  • 適合所有任務都能成功執行,不需要錯誤處理(如果一個任務失敗,整個 map() 會中斷)。

📍 使用 map() 一次提交多個任務

with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(square, [1, 2, 3, 4, 5])
    print(list(results))  # [1, 4, 9, 16, 25]

📌 解析

  • executor.map(square, [1, 2, 3, 4, 5]) 會同時提交 5 個任務
  • 不需要 .result(),直接返回結果列表
  • map() 會等待所有任務完成後才返回結果,因此不能逐步取得結果。

📍 map() 錯誤處理(會導致整個 map() 失敗)

with ThreadPoolExecutor(max_workers=3) as executor:
    try:
        results = executor.map(divide, [10, 10, 10], [1, 0, 2])  # 除數有 0
        print(list(results))  # 這行不會執行,因為 `map()` 會在錯誤發生時中斷
    except Exception as e:
        print(f"⚠️ map() 發生錯誤: {e}")

📌 輸出

⚠️ map() 發生錯誤:  除數不能為 0

📌 為什麼 map() 不適合錯誤處理?

  • map() 不允許個別處理錯誤,如果一個任務失敗,整個 map() 會終止。
  • 若需要處理錯誤,應使用 submit() 來逐步提交並捕捉錯誤。

submit() vs. map():哪種方式適合你?

submit()逐個提交,可手動控制
map()批量提交
submit()Future 物件(需 .result() 取得)
map()直接回傳結果列表
submit()需要逐步取得結果、錯誤處理
map()一次性計算多個輸入,無需個別處理
submit()允許個別處理錯誤
map()若一個任務失敗,整個 map() 會失敗
submit()高,可個別控制任務
map()低,所有任務一起執行

submit() vs. map() 實戰場景

推薦方法submit()
原因每個 API 請求時間不固定,需單獨處理結果
推薦方法submit()
原因允許部分網頁失敗時,仍繼續執行
推薦方法map()
原因所有運算時間一致,適合批量計算
推薦方法submit()
原因可逐步處理並寫入檔案

ThreadPoolExecutor 的應用場景

ThreadPoolExecutor 在 Python 的 I/O 密集型(I/O-Bound)工作 中非常實用,它允許我們同時執行多個執行緒,提高處理效率。

以下是幾個適合使用 ThreadPoolExecutor 的情境:

ThreadPoolExecutor 適用的場景

為什麼適合使用 ThreadPoolExecutor?允許我們同時發送多個 API 請求,提高處理效率,避免單執行緒等待。
示例應用– 解析大量用戶評論並發送給 OpenAI API 優化文案。- 同時請求多個第三方 API 來獲取數據(如天氣查詢、多個新聞來源)。
為什麼適合使用 ThreadPoolExecutor?在爬取網頁時,許多時間都花在等待伺服器回應,多執行緒能讓我們同時爬取多個網頁,加快速度。
示例應用– 從多個新聞網站爬取文章內容。- 批量下載圖片、影片或 PDF 文件。
為什麼適合使用 ThreadPoolExecutor?當我們需要讀取或寫入大量檔案時,透過多執行緒可以同時處理多個文件,減少等待時間。
示例應用– 讀取多個大型日誌文件,並過濾出錯誤訊息。- 批量處理 CSV、JSON 檔案,進行格式轉換或數據清理。
為什麼適合使用 ThreadPoolExecutor?讓程式的主執行緒可以專注於主要任務,而將次要任務交給 ThreadPoolExecutor 執行,避免 UI 或主流程卡住。
示例應用– 遊戲內的數據同步(如排行榜更新)。- UI 應用程式執行自動備份,不影響使用者操作。

ThreadPoolExecutor 應用實戰示例

🔸 同時發送多個 API 請求

from concurrent.futures import ThreadPoolExecutor
import requests

API_URLS = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
    "https://jsonplaceholder.typicode.com/todos/3"
]

def fetch_data(url):
    response = requests.get(url)
    return response.json()

# 使用 ThreadPoolExecutor 同時請求多個 API
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(fetch_data, API_URLS))

print(results)  # 取得三個 API 回應的 JSON 數據

📌 這樣的做法比單執行緒依序請求快上許多,特別是在需要等待 API 回應的情況下。

🔸 爬取多個網頁內容

from concurrent.futures import ThreadPoolExecutor
import requests

URLS = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
]

def scrape_page(url):
    response = requests.get(url)
    return f"{url} 爬取完成,狀態碼: {response.status_code}"

with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(scrape_page, URLS))

print("\n".join(results))

📌 多執行緒讓爬蟲能夠同時發送多個請求,減少等待時間,加快爬取效率。

🔸 批量處理檔案

from concurrent.futures import ThreadPoolExecutor
import os

def read_file(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        return file.read()

files = ["file1.txt", "file2.txt", "file3.txt"]

with ThreadPoolExecutor(max_workers=3) as executor:
    contents = list(executor.map(read_file, files))

print(contents)  # 讀取多個文件的內容

📌 當我們需要同時讀取或寫入多個檔案時,多執行緒可以顯著提升效率。

ThreadPoolExecutor vs. ProcessPoolExecutor

Python 也提供 ProcessPoolExecutor 來處理 CPU 密集型(CPU-Bound) 工作。兩者的主要區別如下:

ThreadPoolExecutor vs. ProcessPoolExecutor 比較

ThreadPoolExecutorI/O 密集型(API、爬蟲、檔案讀寫)
ProcessPoolExecutorCPU 密集型(數據計算、影像處理)
ThreadPoolExecutor多個 執行緒(Threads),共享記憶體
ProcessPoolExecutor多個 進程(Processes),獨立記憶體
ThreadPoolExecutor是(適用於 I/O 操作)
ProcessPoolExecutor否(適用於 CPU 計算)
ThreadPoolExecutor低(執行緒切換較快)
ProcessPoolExecutor高(需要額外的記憶體空間與 IPC 通訊)
ThreadPoolExecutor同時發送 API 請求、讀取多個檔案、爬取網頁
ProcessPoolExecutor平行數據計算、機器學習訓練、大型矩陣運算

📌 結論:

  • 如果你的工作主要是等待(例如 API、爬蟲、檔案讀寫)➡ 使用 ThreadPoolExecutor
  • 如果你的工作需要大量計算(例如影像處理、數據分析)➡ 使用 ProcessPoolExecutor

ProcessPoolExecutor 適用場景示例

🔸 CPU 密集型工作

from concurrent.futures import ProcessPoolExecutor

def heavy_computation(n):
    return sum(i**2 for i in range(n))

numbers = [1000000, 2000000, 3000000]

with ProcessPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(heavy_computation, numbers))

print(results)  # 處理三個大數據計算任務

📌 如果我們用 ThreadPoolExecutor,GIL 會限制 Python 只能一次運行一個執行緒,因此效果不佳。


最佳實踐與潛在問題

最佳實踐

  1. 控制 max_workers 避免過多執行緒
    • 設定 max_workers 不宜過高,通常為 CPU 核心數 * 2
  2. 使用 submit() 處理回傳值
    • 如果需要獲取任務結果,submit() 會比 map() 更靈活。
  3. 確保使用 with 語法
    • with ThreadPoolExecutor() as executor: 會自動關閉執行緒池,避免資源浪費。

潛在問題

解決方案適當限制 max_workers
解決方案使用 threading.Lock() 保護變數
解決方案使用 try-except 捕捉錯誤

總結

ThreadPoolExecutor 提供簡單高效的方式來執行多個執行緒
✅ 適合用於I/O 密集型任務(API 請求、網頁爬取、檔案處理)
max_workers 控制同時執行的執行緒數量,避免資源耗盡
對 CPU 密集型任務,應改用 ProcessPoolExecutor 🚀