ThreadPoolExecutor vs. asyncio:完整解析與實戰示例

更新日期: 2025 年 2 月 13 日

在 Python 中,我們有兩種方式來提高 I/O 操作的效率:

  1. 多執行緒(Threading): 使用 ThreadPoolExecutor並行執行同步函式(如 requests)。
  2. 非同步(Asynchronous): 使用 asyncio非同步執行支援 async 的函式(如 aiohttp)。

這兩者的核心概念不同,許多開發者容易混淆:

  • ThreadPoolExecutor 使用多執行緒來並行運行同步函式
  • asyncio 使用單執行緒的事件迴圈來切換非同步函式

ThreadPoolExecutor(多執行緒)

什麼是多執行緒(Threading)?

執行緒(Thread) 是程式執行的最小單位,透過多執行緒(Multithreading),我們可以同時執行多個任務,提高效率。

ThreadPoolExecutor 如何運作?

  • 主要目標:在同步環境中,提高 I/O 操作的執行效率。
  • 工作方式:使用多個執行緒同時執行不同的任務,讓程式看起來「同時」處理多個請求。
  • 適用場景
    • 多個 API 請求(使用 requests
    • 批量讀取或寫入檔案
    • 與資料庫進行多個查詢

ThreadPoolExecutor 示例

📍 讓同步 requests 並行運行

import requests
from concurrent.futures import ThreadPoolExecutor

URLS = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
    "https://jsonplaceholder.typicode.com/todos/3",
]

def fetch(url):
    """發送 HTTP 請求"""
    response = requests.get(url)
    return response.json()

# 使用 ThreadPoolExecutor 讓多個請求同時執行
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(fetch, URLS))

print(results)

詳細解析

當我們使用 requests.get(url) 發送 API 請求時,它是一個「同步(Blocking)操作」,這意味著:

  • requests.get(url) 被呼叫時,程式會停下來等待伺服器回應,直到收到完整的回應後,才會繼續執行下一行程式碼。
  • 如果我們需要發送多個請求,預設情況下,它們會一個接一個依序執行,導致效率低落。

為了解決這個問題,我們可以使用 ThreadPoolExecutor,透過「多執行緒(Threading)」來並行發送多個 API 請求,提高執行效率。

📍requests.get(url) 是同步(Blocking)操作

我們先看一個單執行緒發送 API 請求的範例:

import requests
import time

URLS = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
    "https://jsonplaceholder.typicode.com/todos/3",
]

def fetch(url):
    print(f"🔄 發送請求: {url}")
    response = requests.get(url)  # 這是一個同步操作,程式會卡在這裡
    print(f"✅ 完成請求: {url}")
    return response.json()

start_time = time.time()
results = [fetch(url) for url in URLS]  # 依序發送 3 個請求
end_time = time.time()

print(f"總執行時間: {end_time - start_time:.2f} 秒")
  • requests.get(url) 一次只能發送一個請求,程式必須等這個請求完成後,才能發送下一個請求。
  • 如果每個請求都需要 1 秒,那麼總執行時間至少 3 秒
  • 問題:如果我們有 1000 個 API 請求,這樣的方式會非常慢。

📍ThreadPoolExecutor 如何加速 API 請求?

為了解決「同步操作導致等待時間過長」的問題,我們可以使用 ThreadPoolExecutor,透過多執行緒(Threading) 讓程式同時發送多個請求

ThreadPoolExecutorrequests 同時發送多個請求

import requests
import time
from concurrent.futures import ThreadPoolExecutor

URLS = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
    "https://jsonplaceholder.typicode.com/todos/3",
]

def fetch(url):
    print(f"🔄 發送請求: {url}")
    response = requests.get(url)  # 這是一個同步操作,但透過 ThreadPoolExecutor 讓它並行執行
    print(f"✅ 完成請求: {url}")
    return response.json()

start_time = time.time()

# 使用 ThreadPoolExecutor 讓 3 個請求同時執行
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(fetch, URLS))

end_time = time.time()
print(f"總執行時間: {end_time - start_time:.2f} 秒")
  1. ThreadPoolExecutor(max_workers=3) 開 3 個執行緒
    • 這意味著 Python 會同時啟動 3 個執行緒 來執行 fetch(url) 函式。
    • 每個執行緒負責 1 個 API 請求,不需要等其他請求完成才能開始新的請求。
  2. executor.map(fetch, URLS) 讓多個請求同時執行
    • executor.map() 會自動把 URLS 中的每個網址 分配給執行緒,讓它們並行發送請求
    • 這樣,我們的 3 個請求可以「同時發送」,不需要等上一個請求完成後才開始下一個。
  3. 執行時間大幅縮短
    • 在「單執行緒」的情況下,3 個請求需要 3 秒(1 秒/請求)
    • 使用 ThreadPoolExecutor3 個請求可以同時執行,總時間約 1 秒
    • 節省了 2/3 的時間,提高了 3 倍效能!

📍ThreadPoolExecutor vs. 單執行緒對比

方法執行方式總執行時間(假設每個請求 1 秒)
單執行緒(同步)逐個請求,等待回應後才發送下一個⏳ 3 秒
ThreadPoolExecutor(多執行緒)同時發送 3 個請求🚀 1 秒

這就是 ThreadPoolExecutor 的強大之處!

它不會讓 requests.get(url) 變成非同步函式,但它透過多執行緒讓多個同步函式同時執行,大幅提升效能。


asyncio(非同步事件驅動)

什麼是非同步(Asynchronous)?

非同步(Asynchronous) 是一種事件驅動(Event-Driven) 模型,允許我們在等待 I/O 操作時切換任務,而不是讓 CPU 閒置。

asyncio 如何運作?

  • 主要目標:在單一執行緒內,同時執行多個 I/O 任務,而不會互相阻塞。
  • 工作方式
    • asyncio 使用事件迴圈(Event Loop) 來管理非同步函式(coroutines)。
    • 當一個函式遇到 await 時,程式會「暫停」該函式,然後去執行其他任務。
    • asyncio 不會自動使用多執行緒或多進程,它完全依賴單一執行緒 + 任務切換

asyncio 示例

📍 讓 aiohttp 非同步發送 API 請求

import aiohttp
import asyncio

URLS = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
    "https://jsonplaceholder.typicode.com/todos/3",
]

async def fetch(session, url):
    """使用 aiohttp 非同步發送請求"""
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in URLS]
        results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

詳細解析

當我們使用 asyncio 搭配 aiohttp 來發送 HTTP 請求時,與傳統的 requests 模組有很大不同。讓我們逐步拆解這段程式碼的運作方式。

📍 aiohttpasyncio 兼容的 HTTP 請求庫

在 Python 中,最常見的 HTTP 請求庫是 requests,但 requests.get(url)同步(Blocking)操作,會阻塞整個程式,直到伺服器回應後才能繼續執行。

aiohttp 則是一個專為 asyncio 設計的非同步 HTTP 庫,它允許我們使用 asyncawait 來非同步發送 HTTP 請求,而不會卡住程式。

🔹requests(同步,會阻塞)
import requests

def fetch(url):
    response = requests.get(url)
    return response.json()

print(fetch("https://jsonplaceholder.typicode.com/todos/1"))  # 這行會等待請求完成

問題:

  • requests.get(url) 是同步的,請求發送後必須等伺服器回應,程式才會繼續執行。
  • 如果有多個 API 請求,這些請求會「依序執行」,導致效率低落。
🔹 aiohttp(非同步,不會阻塞)
import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

print(asyncio.run(fetch("https://jsonplaceholder.typicode.com/todos/1")))  

為什麼 aiohttp 更好?

  • async with aiohttp.ClientSession() 允許在非同步環境下建立 HTTP 連線
  • await response.json() 不會阻塞其他任務,當 fetch() 等待回應時,事件迴圈可以執行其他任務。
  • 適合大量 API 請求,不會一個一個等待,讓程式變得更高效。

📍 asyncio.gather() 讓所有請求同時發送」,而不會等待彼此完成

通常在同步環境下,我們需要一個一個發送請求:

results = [fetch(url) for url in URLS]  # 這會逐個發送請求

asyncio.gather() 允許我們一次性發送所有請求,並等待它們的結果

🔹asyncio.gather() 並行處理
import aiohttp
import asyncio

URLS = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
    "https://jsonplaceholder.typicode.com/todos/3",
]

async def fetch(session, url):
    """ 非同步發送 HTTP 請求 """
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in URLS]  # 建立任務列表
        results = await asyncio.gather(*tasks)  # 並行執行所有任務
    print(results)

asyncio.run(main())
  1. tasks = [fetch(session, url) for url in URLS]
    • 建立一個「待執行的非同步函式列表」,但這些函式還沒執行。
  2. await asyncio.gather(*tasks)
    • 讓所有 fetch() 任務同時執行,不需要等一個請求完成後再發送下一個。
    • 這與傳統同步程式的「一個一個請求」完全不同。
🔹 asyncio.gather() 的關鍵優勢
方法請求方式效率
同步(requests)逐個發送,每個請求都要等待前一個完成
非同步 asyncio.gather()一次發送所有請求,不互相等待

這讓 Python 可以像 JavaScriptPromise.all() 一樣,並行執行多個請求!

📍ThreadPoolExecutor 傳統同步環境

在傳統的同步環境中,ThreadPoolExecutor 允許我們並行執行同步函式,例如 requests.get()

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch(url):
    return requests.get(url).json()

with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(fetch, URLS))

問題

  • ThreadPoolExecutor 是多執行緒技術,需要開啟多個執行緒來處理同步任務。
  • requests.get(url) 仍然是同步函式,只是分配到不同的執行緒上執行,效能提升有限。
  • 不是真正的非同步運行!
🔹 aiohttp + asyncio 不需要 ThreadPoolExecutor
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in URLS]
        results = await asyncio.gather(*tasks)  
    print(results)

asyncio.run(main())  

這樣的做法完全不需要 ThreadPoolExecutor,因為 aiohttp 原生支援 asyncio,能夠真正做到非同步處理。


ThreadPoolExecutor vs. asyncio 的核心區別

特性ThreadPoolExecutor(多執行緒)asyncio(非同步)
並行方式多個執行緒單執行緒 + 事件迴圈
適合場景同步 I/O 任務(如 requests.get()、檔案讀寫)支援 async 的函式(如 aiohttp)
是否受 GIL 限制是(但 I/O 任務影響不大)否(非同步處理,不影響 GIL)
是否適合 CPU 密集型運算❌ 不適合❌ 不適合
錯誤示範asyncio.run(ThreadPoolExecutor())asyncio.run(requests.get())(會卡住)

asyncio 如何搭配 ThreadPoolExecutor

如果我們必須在 asyncio 內執行同步函式(如 requests.get()),可以用 ThreadPoolExecutor 來轉換。

ThreadPoolExecutorasyncio 中的角色

ThreadPoolExecutor 本質上是同步的執行緒池,但當它與 asyncio 搭配時,透過 loop.run_in_executor(),我們可以讓同步函式以非同步的方式運行,避免阻塞 asyncio

ThreadPoolExecutorasyncio 內的運作流程

sequenceDiagram
    participant EventLoop as asyncio 事件迴圈
    participant ThreadPool as ThreadPoolExecutor (執行緒池)
    participant Worker1 as 執行緒 1 (執行同步函式)
    participant Worker2 as 執行緒 2 (執行同步函式)
    
    EventLoop->>ThreadPool: 使用 loop.run_in_executor() 提交同步函式
    ThreadPool->>Worker1: 啟動執行緒 1 執行函式
    ThreadPool->>Worker2: 啟動執行緒 2 執行函式
    EventLoop->>EventLoop: 繼續執行其他 asyncio 任務 (非同步)
    Worker1-->>ThreadPool: 任務完成,返回結果
    Worker2-->>ThreadPool: 任務完成,返回結果
    ThreadPool-->>EventLoop: 回傳結果給 asyncio
    EventLoop->>EventLoop: 處理 run_in_executor() 的回傳值

這個流程發生了什麼?

  1. asyncio 事件迴圈(EventLoop)啟動
  2. asyncio 使用 loop.run_in_executor(),將 fetch() 提交給 ThreadPoolExecutor
  3. ThreadPoolExecutor 啟動新的執行緒 Worker1 來執行 fetch()
  4. fetch() 呼叫 requests.get(URL),向遠端 API 發送請求
  5. API 回應 JSON,Worker1 執行緒完成請求
  6. Worker1 將結果回傳給 ThreadPoolExecutor
  7. ThreadPoolExecutor 再將結果回傳給 asyncio 事件迴圈
  8. asyncio 事件迴圈繼續執行其他非同步任務,並處理 fetch() 的回傳值

重點整理

當我們說 ThreadPoolExecutor 本質上是同步的,意思是它本來是為了 標準的、多執行緒的阻塞式(同步)程式設計 而設計的。

它允許你在背景執行某些任務,但這些任務本身仍然是 同步執行的,只是它們跑在不同的執行緒中,不會阻塞主執行緒。

但是,asyncio 是基於單執行緒的事件迴圈,主要用於 非同步 I/O 操作,它不會直接支援 ThreadPoolExecutor 這種基於執行緒的並行機制。

因此,asyncio 提供了 run_in_executor() 這個方法,讓開發者可以把 同步(阻塞)任務放進 ThreadPoolExecutor,並透過 await 讓它在事件迴圈內執行,這樣就能讓 同步的 ThreadPoolExecutor 變得「非同步友好」

舉個例子:

假設有一個同步函式 blocking_task(),它會執行一些耗時的操作:

import time
def blocking_task():
    time.sleep(3)  # 模擬一個同步的阻塞操作
    return "Done"

這是一個標準的 同步函式,如果直接在 asyncio 內執行,它會阻塞整個事件迴圈。但我們可以用 run_in_executor() 把它丟進 ThreadPoolExecutor 來非同步執行:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, blocking_task)
    print(result)

asyncio.run(main())

這裡 run_in_executor() 的作用就是:

  1. 把同步函式 blocking_task() 丟進 ThreadPoolExecutor 執行,讓它在單獨的執行緒裡跑。
  2. 不阻塞 asyncio 的事件迴圈,因為 awaitasyncio 可以在等待結果的同時執行其他非同步任務。

asyncio + ThreadPoolExecutor 程式碼解析

現在,我們來看看如何在 asyncio 中使用 ThreadPoolExecutor,讓同步函式不會影響 asyncio 的非同步運作。

import asyncio
import requests
from concurrent.futures import ThreadPoolExecutor

URL = "https://jsonplaceholder.typicode.com/todos/1"

def fetch():
    """🔹 這是一個同步函式"""
    return requests.get(URL).json()  # 使用 requests.get() 發送同步 HTTP 請求

async def main():
    loop = asyncio.get_running_loop()  # 取得目前的 asyncio 事件迴圈
    with ThreadPoolExecutor() as executor:  # 建立執行緒池
        result = await loop.run_in_executor(executor, fetch)  # 在執行緒池中執行同步函式
    print(result)

asyncio.run(main())

📍逐步解析

🔹1. def fetch()
def fetch():
    """🔹 這是一個同步函式"""
    return requests.get(URL).json()  # 使用 requests.get() 發送同步 HTTP 請求

fetch() 是同步函式,執行時會阻塞執行緒

  • requests.get(URL) 是同步操作,當執行時,程式會等待 API 回應後才繼續執行。
  • 如果 fetch() 直接在 asyncio 內部執行,asyncio 會被卡住,無法處理其他任務
🔹 2. loop = asyncio.get_running_loop()
loop = asyncio.get_running_loop()
  • asyncio.get_running_loop() 取得當前執行中的 asyncio 事件迴圈
  • asyncio 事件迴圈負責管理所有 async 任務,確保它們可以非同步執行。
🔹 3. 建立 ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
  • 建立一個執行緒池(ThreadPoolExecutor),讓 fetch() 在額外的執行緒內執行。
  • 避免 fetch() 直接執行在 asyncio 事件迴圈內,導致卡住其他非同步任務
🔹 4. loop.run_in_executor(executor, fetch)
result = await loop.run_in_executor(executor, fetch)
  1. fetch()ThreadPoolExecutor 內的執行緒執行
  2. asyncio 事件迴圈不會被 fetch() 卡住,仍可執行其他 async 任務
  3. fetch() 完成時,await 會取得結果,並返回給 result 變數

這樣的做法讓同步函式 fetch()asyncio 內部執行,而不會影響 asyncio 的非同步性。

🔹 5. 啟動 asyncio
asyncio.run(main())
  • 啟動 asyncio 事件迴圈,執行 main() 函式
  • main() 是非同步函式,因此 asyncio.run(main()) 負責運行整個 asyncio 非同步架構

總結

ThreadPoolExecutor 適合多執行緒並行執行同步函式,如 requests 或檔案讀寫。
asyncio 適合處理 async 函式,如 aiohttp,但不能直接執行同步函式。
可以用 ThreadPoolExecutor 讓同步函式在 asyncio 環境中運行,解決 asyncio 無法處理同步任務的問題。 🚀

Similar Posts