在开始具体介绍各种并发方法之前,这里简单总结一下选择建议:
方法 | 适用场景 | 数据传递 | 优点 | 缺点 |
threading | I/O密集型任务:如网络上传、数据库查询等,线程在等待I/O时会释放GIL,让其他线程工作 | 在主线程和工作线程之间传递的是数据的引用(内存地址)。没有数据拷贝或序列化的开销 | 创建线程开销小,所有线程共享同一个内存空间,资源利用更高效,适合处理网络请求等I/O操作 | Python的GIL限制导致CPU密集型任务无法真正并行,多线程切换也会带来额外开销 |
multiprocessing | CPU密集型任务:如大规模数学计算、视频编码等,需要利用多核CPU并行计算以绕开GIL | 开销巨大。把一个大的数据块put到队列中时,它必须被序列化(pickle),通过管道(pipe)发送到另一个进程,然后在那个进程中反序列化(unpickle) | 可以充分利用多核CPU实现真正的并行计算,每个进程有独立的GIL | 每个进程都有独立的内存副本,总体内存消耗更大,启动时间较长 |
concurrent.futures | 简单并行任务:适合批量处理独立且相似的任务,如批量文件处理、网络请求等 | 线程池模式下与threading相同,进程池模式下与multiprocessing相同,都需要进行数据序列化 | 提供统一的高级接口同时支持线程池和进程池,使用map等函数式API更直观 | 无法细粒度控制任务调度,不支持复杂的并发模式,功能相对基础 |
asyncio | I/O密集型任务:特别适合高并发场景,如Web服务器、大量网络请求、API调用等 | 协程间通过队列、事件或共享变量传递数据,没有序列化开销,数据共享高效 | 单线程实现并发,避免线程切换开销,支持大量并发连接,性能优异 | 需要使用async/await特殊语法,第三方库需要专门支持,不适合CPU密集型任务 |
joblib | 科学计算:数据分析、机器学习训练和预测、大规模数值计算等 | 默认使用多进程,数据需要序列化,但针对numpy数组等科学计算对象有特殊优化 | 针对科学计算优化,支持结果缓存和进度条显示,与numpy/scipy等库集成好 | 主要面向数据处理和机器学习场景,通用并发功能有限,不适合复杂的并发控制 |
1. threading - 多线程
适用于I/O密集型任务的简单示例,并发下载多个文件:
import threading
import requests
def download_file(url, filename):
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
print(f"Downloaded {filename}")
# 创建多个线程下载文件
urls = ['http://example.com/file1', 'http://example.com/file2']
threads = []
for i, url in enumerate(urls):
thread = threading.Thread(target=download_file, args=(url, f'file{i}.txt'))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
2. multiprocessing - 多进程
适用于CPU密集型任务的示例,并行计算数字平方:
from multiprocessing import Process, Queue
def calculate_squares(numbers, queue):
for n in numbers:
queue.put(n * n)
if __name__ == '__main__':
numbers_list = [[1,2,3], [4,5,6], [7,8,9]]
queue = Queue()
processes = []
# 创建多个进程
for numbers in numbers_list:
p = Process(target=calculate_squares, args=(numbers, queue))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 获取结果
results = []
while not queue.empty():
results.append(queue.get())
print(results)
3. concurrent.futures - 线程池和进程池
使用线程池处理多个任务的示例:
from concurrent.futures import ThreadPoolExecutor
import time
def process_item(item):
time.sleep(1)
return f"Processed {item}"
with ThreadPoolExecutor(max_workers=3) as executor:
items = ['A', 'B', 'C', 'D', 'E']
results = list(executor.map(process_item, items))
print(results)
4. asyncio - 异步编程
异步处理多个网络请求的示例:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'http://example.com/1',
'http://example.com/2',
'http://example.com/3'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 运行异步任务
results = asyncio.run(main())
5. joblib - 并行计算
用joblib和tqdm可以实现非常好的并发数据处理:
from joblib import Parallel, delayed
from tqdm.auto import tqdm
import numpy as np
def process_array(arr):
return np.sum(arr ** 2)
# 创建示例数据
data = [np.random.rand(1000) for _ in range(100)]
# 并行处理数据
results = Parallel(n_jobs=4)(
delayed(process_array)(x) for x in tqdm(data)
)