在开始具体介绍各种并发方法之前,这里简单总结一下选择建议:

方法适用场景数据传递优点缺点
threadingI/O密集型任务:如网络上传、数据库查询等,线程在等待I/O时会释放GIL,让其他线程工作在主线程和工作线程之间传递的是数据的引用(内存地址)。没有数据拷贝或序列化的开销创建线程开销小,所有线程共享同一个内存空间,资源利用更高效,适合处理网络请求等I/O操作Python的GIL限制导致CPU密集型任务无法真正并行,多线程切换也会带来额外开销
multiprocessingCPU密集型任务:如大规模数学计算、视频编码等,需要利用多核CPU并行计算以绕开GIL开销巨大。把一个大的数据块put到队列中时,它必须被序列化(pickle),通过管道(pipe)发送到另一个进程,然后在那个进程中反序列化(unpickle)可以充分利用多核CPU实现真正的并行计算,每个进程有独立的GIL每个进程都有独立的内存副本,总体内存消耗更大,启动时间较长
concurrent.futures简单并行任务:适合批量处理独立且相似的任务,如批量文件处理、网络请求等线程池模式下与threading相同,进程池模式下与multiprocessing相同,都需要进行数据序列化提供统一的高级接口同时支持线程池和进程池,使用map等函数式API更直观无法细粒度控制任务调度,不支持复杂的并发模式,功能相对基础
asyncioI/O密集型任务:特别适合高并发场景,如Web服务器、大量网络请求、API调用等协程间通过队列、事件或共享变量传递数据,没有序列化开销,数据共享高效单线程实现并发,避免线程切换开销,支持大量并发连接,性能优异需要使用async/await特殊语法,第三方库需要专门支持,不适合CPU密集型任务
joblib科学计算:数据分析、机器学习训练和预测、大规模数值计算等默认使用多进程,数据需要序列化,但针对numpy数组等科学计算对象有特殊优化针对科学计算优化,支持结果缓存和进度条显示,与numpy/scipy等库集成好主要面向数据处理和机器学习场景,通用并发功能有限,不适合复杂的并发控制

1. threading - 多线程

适用于I/O密集型任务的简单示例,并发下载多个文件:

import threading
import requests

def download_file(url, filename):
    response = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(response.content)
    print(f"Downloaded {filename}")

# 创建多个线程下载文件
urls = ['http://example.com/file1', 'http://example.com/file2']
threads = []
for i, url in enumerate(urls):
    thread = threading.Thread(target=download_file, args=(url, f'file{i}.txt'))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

2. multiprocessing - 多进程

适用于CPU密集型任务的示例,并行计算数字平方:

from multiprocessing import Process, Queue

def calculate_squares(numbers, queue):
    for n in numbers:
        queue.put(n * n)

if __name__ == '__main__':
    numbers_list = [[1,2,3], [4,5,6], [7,8,9]]
    queue = Queue()
    processes = []
    
    # 创建多个进程
    for numbers in numbers_list:
        p = Process(target=calculate_squares, args=(numbers, queue))
        processes.append(p)
        p.start()
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    # 获取结果
    results = []
    while not queue.empty():
        results.append(queue.get())
    print(results)

3. concurrent.futures - 线程池和进程池

使用线程池处理多个任务的示例:

from concurrent.futures import ThreadPoolExecutor
import time

def process_item(item):
    time.sleep(1)
    return f"Processed {item}"

with ThreadPoolExecutor(max_workers=3) as executor:
    items = ['A', 'B', 'C', 'D', 'E']
    results = list(executor.map(process_item, items))
    
print(results)

4. asyncio - 异步编程

异步处理多个网络请求的示例:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'http://example.com/1',
        'http://example.com/2',
        'http://example.com/3'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# 运行异步任务
results = asyncio.run(main())

5. joblib - 并行计算

用joblib和tqdm可以实现非常好的并发数据处理:

from joblib import Parallel, delayed
from tqdm.auto import tqdm
import numpy as np

def process_array(arr):
    return np.sum(arr ** 2)

# 创建示例数据
data = [np.random.rand(1000) for _ in range(100)]

# 并行处理数据
results = Parallel(n_jobs=4)(
    delayed(process_array)(x) for x in tqdm(data)
)