并发方案速查表
方法 | 场景 | 数据传递 | 优势 | 坑点 |
---|---|---|---|---|
threading | I/O等待多的任务 | 共享内存,直接传引用 | 轻量,启动快 | GIL导致CPU密集型假并行,线程安全头疼 |
multiprocessing | CPU密集计算 | pickle序列化,开销大 | 真并行,各进程独立GIL | 内存翻倍,启动慢,调试难 |
concurrent.futures | 批量同质任务 | 看底层是线程还是进程 | API简洁,自带超时控制 | 功能简单,复杂调度做不了 |
asyncio | 大量I/O并发 | 协程间直接传递 | 单线程干万线程的活 | async传染性,生态割裂,调试是噩梦 |
joblib | numpy计算 | 特殊优化的序列化 | 自带缓存,进度条香 | 基本就是给数据科学用的 |
Threading 多线程
GIL 的真相
import threading
import time
import dis
# 看看GIL什么时候释放
def check_gil_release():
# I/O操作会释放GIL
time.sleep(0.001) # ✓ 释放
open('file.txt') # ✓ 释放
requests.get(url) # ✓ 释放
# 纯Python计算不释放
sum(range(1000000)) # ✗ 不释放
[i**2 for i in range(1000)] # ✗ 不释放
# 查看字节码,每100条指令检查一次是否要切换线程
dis.dis(lambda: sum(range(100)))
线程安全的数据结构
import threading
from queue import Queue, LifoQueue, PriorityQueue
from collections import deque
import time
# 线程安全的队列们
safe_queue = Queue() # FIFO
safe_stack = LifoQueue() # LIFO
safe_pq = PriorityQueue() # 优先队列
# deque某些操作是原子的(但不是全部!)
safe_deque = deque() # append(), appendleft(), pop(), popleft() 是原子的
# 自己实现线程安全计数器
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
return self._value
def value(self):
with self._lock:
return self._value
# 实测GIL切换开销
def measure_gil_overhead():
"""测量线程切换开销"""
def cpu_bound(n):
return sum(i**2 for i in range(n))
n = 1000000
# 单线程
start = time.perf_counter()
for _ in range(4):
cpu_bound(n)
single = time.perf_counter() - start
# 多线程(4个线程)
start = time.perf_counter()
threads = [threading.Thread(target=cpu_bound, args=(n,)) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
multi = time.perf_counter() - start
print(f"单线程: {single:.3f}s")
print(f"多线程: {multi:.3f}s")
print(f"多线程反而慢了: {multi/single:.2f}x") # 通常是1.5-2x慢
线程池的坑
from concurrent.futures import ThreadPoolExecutor
import threading
# 线程池复用的陷阱
thread_locals = threading.local()
def task_with_state(task_id):
# 线程局部存储在池中会被复用!
if not hasattr(thread_locals, 'count'):
thread_locals.count = 0
thread_locals.count += 1
thread_id = threading.current_thread().ident
print(f"Task {task_id} on thread {thread_id}, count={thread_locals.count}")
return thread_locals.count
# 同一个线程会处理多个任务
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(task_with_state, range(10)))
# 会看到同一线程的count在累加!
Multiprocessing 多进程
序列化开销实测
import multiprocessing as mp
import pickle
import numpy as np
import time
def measure_pickle_overhead():
"""测量不同数据类型的序列化开销"""
test_data = {
'small_list': list(range(100)),
'large_list': list(range(1000000)),
'numpy_array': np.random.rand(1000000),
'nested_dict': {i: {j: j**2 for j in range(100)} for i in range(100)},
}
for name, data in test_data.items():
start = time.perf_counter()
pickled = pickle.dumps(data)
pickle_time = time.perf_counter() - start
start = time.perf_counter()
unpickled = pickle.loads(pickled)
unpickle_time = time.perf_counter() - start
size_mb = len(pickled) / 1024 / 1024
print(f"{name:15} | Size: {size_mb:6.2f}MB | "
f"Pickle: {pickle_time*1000:6.2f}ms | "
f"Unpickle: {unpickle_time*1000:6.2f}ms")
# 共享内存避免序列化(Python 3.8+)
def use_shared_memory():
from multiprocessing import shared_memory
# 创建共享内存
arr = np.random.rand(10000000)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:]
def worker(shm_name, shape, dtype):
# 在子进程中连接到共享内存
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
result = np.sum(arr) # 直接操作,无需序列化
existing_shm.close()
return result
# 传递的只是名字和元数据,不是数据本身
with mp.Pool() as pool:
result = pool.apply_async(worker, (shm.name, arr.shape, arr.dtype))
print(result.get())
shm.close()
shm.unlink()
进程池的内存问题
import os
import psutil
import multiprocessing as mp
def memory_hungry_task(n):
"""模拟内存密集型任务"""
process = psutil.Process(os.getpid())
# 分配大内存
big_list = list(range(n))
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Process {os.getpid()} using {memory_mb:.2f}MB")
return sum(big_list)
# 观察内存使用
def monitor_memory_usage():
n = 10000000
# 主进程内存
main_process = psutil.Process()
main_memory_before = main_process.memory_info().rss / 1024 / 1024
# 创建4个进程,每个都会复制一份数据
with mp.Pool(4) as pool:
results = pool.map(memory_hungry_task, [n] * 4)
main_memory_after = main_process.memory_info().rss / 1024 / 1024
print(f"Main process: {main_memory_before:.2f}MB -> {main_memory_after:.2f}MB")
Concurrent.futures的细节
Future 对象的妙用
from concurrent.futures import ThreadPoolExecutor, Future, as_completed, wait, FIRST_COMPLETED
import time
import random
def unpredictable_task(task_id):
"""模拟不确定完成时间的任务"""
sleep_time = random.uniform(0.1, 2.0)
time.sleep(sleep_time)
if random.random() < 0.3: # 30%概率失败
raise Exception(f"Task {task_id} failed")
return f"Task {task_id} completed in {sleep_time:.2f}s"
# 不同的等待策略
def future_patterns():
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务,立即返回Future
futures = [executor.submit(unpredictable_task, i) for i in range(10)]
# 方式1: as_completed - 谁完成了处理谁
print("=== as_completed ===")
for future in as_completed(futures, timeout=5):
try:
result = future.result()
print(f"✓ {result}")
except Exception as e:
print(f"✗ {e}")
# 方式2: wait - 等待特定条件
futures = [executor.submit(unpredictable_task, i) for i in range(5)]
done, not_done = wait(futures, timeout=1, return_when=FIRST_COMPLETED)
print(f"\\n=== wait ===")
print(f"Completed: {len(done)}, Pending: {len(not_done)}")
# 方式3: 直接调用result() - 阻塞等待
future = executor.submit(unpredictable_task, 99)
try:
result = future.result(timeout=3)
print(f"\\n=== direct result ===\\n✓ {result}")
except Exception as e:
print(f"\\n=== direct result ===\\n✗ {e}")
# 回调机制
def with_callbacks():
def task_done_callback(future):
try:
result = future.result()
print(f"Callback: Task completed with {result}")
except Exception as e:
print(f"Callback: Task failed with {e}")
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(unpredictable_task, 1)
future.add_done_callback(task_done_callback)
# 主线程可以继续做其他事
print("Main thread continues...")
time.sleep(3)
Asyncio 深入
事件循环的本质
import asyncio
import selectors
import socket
# asyncio 底层就是 select/epoll
def peek_under_hood():
"""看看事件循环在干什么"""
loop = asyncio.get_event_loop()
# 获取底层的selector
selector = loop._selector # 私有属性,别在生产环境用
print(f"Using selector: {type(selector)}") # Linux上是EpollSelector
# 查看注册的文件描述符
for key, events in selector.get_map().items():
print(f"FD: {key}, Events: {events}")
# 自定义事件循环策略
class DebugEventLoop(asyncio.BaseEventLoop):
def _run_once(self):
print("Event loop tick")
super()._run_once()
# 协程的调度
async def inspect_scheduling():
"""观察协程如何被调度"""
async def task(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished")
return name
# 创建任务但不等待
tasks = [
asyncio.create_task(task("A", 0.5)),
asyncio.create_task(task("B", 0.3)),
asyncio.create_task(task("C", 0.4))
]
# 任务已经在运行了!
await asyncio.sleep(0.1)
print("Main coroutine continues while tasks run")
# 现在等待所有任务
results = await asyncio.gather(*tasks)
return results
异步上下文管理器和迭代器
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("Acquiring resource...")
await asyncio.sleep(0.5)
self.resource = "Resource acquired"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(0.5)
self.resource = None
class AsyncDataStream:
"""异步迭代器"""
def __init__(self, max_items=5):
self.max_items = max_items
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.max_items:
raise StopAsyncIteration
await asyncio.sleep(0.2) # 模拟异步获取数据
self.current += 1
return f"Data chunk {self.current}"
async def use_async_patterns():
# 使用异步上下文管理器
async with AsyncResource() as resource:
print(f"Using {resource.resource}")
# 使用异步迭代器
async for chunk in AsyncDataStream():
print(f"Processing {chunk}")
并发控制
import asyncio
from asyncio import Semaphore, Lock, Event, Condition
class RateLimiter:
"""速率限制器"""
def __init__(self, rate, per):
self.rate = rate
self.per = per
self.allowance = rate
self.last_check = asyncio.get_event_loop().time()
async def acquire(self):
current = asyncio.get_event_loop().time()
time_passed = current - self.last_check
self.last_check = current
self.allowance += time_passed * (self.rate / self.per)
if self.allowance > self.rate:
self.allowance = self.rate
if self.allowance < 1.0:
sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
await asyncio.sleep(sleep_time)
self.allowance = 0.0
else:
self.allowance -= 1.0
async def controlled_concurrency():
# 信号量控制并发数
sem = Semaphore(3)
async def limited_task(id):
async with sem:
print(f"Task {id} running")
await asyncio.sleep(1)
# 只有3个任务同时运行
await asyncio.gather(*[limited_task(i) for i in range(10)])
# 速率限制
limiter = RateLimiter(rate=5, per=1) # 5 requests per second
async def rate_limited_task(id):
await limiter.acquire()
print(f"Request {id} sent at {asyncio.get_event_loop().time():.2f}")
await asyncio.gather(*[rate_limited_task(i) for i in range(10)])
Joblib 实用技巧
缓存机制
from joblib import Memory
import numpy as np
import time
# 内存缓存
memory = Memory('./cache_dir', verbose=1)
@memory.cache
def expensive_computation(n):
"""昂贵的计算,结果会被缓存"""
print(f"Computing for n={n}...")
time.sleep(2)
return np.random.rand(n, n)
# 第一次调用:计算
result1 = expensive_computation(1000)
# 第二次调用:从缓存读取
result2 = expensive_computation(1000)
# 清理缓存
memory.clear(warn=False)
# 条件缓存
@memory.cache(ignore=['verbose'])
def flexible_function(data, verbose=False):
"""verbose参数改变不会影响缓存"""
if verbose:
print("Processing...")
return data.mean()
内存映射优化
from joblib import Parallel, delayed, dump, load
import numpy as np
def process_large_array():
"""处理超大数组时的内存优化"""
# 创建内存映射数组,不会全部加载到内存
large_array = np.memmap('large_array.dat', dtype='float32',
mode='w+', shape=(100000, 1000))
large_array[:] = np.random.rand(100000, 1000)
def process_chunk(arr, start, end):
# 只处理一部分,内存友好
return arr[start:end].mean()
# 并行处理,每个进程只加载需要的部分
chunk_size = 10000
results = Parallel(n_jobs=4)(
delayed(process_chunk)(large_array, i, i+chunk_size)
for i in range(0, len(large_array), chunk_size)
)
return results
# 高效的数据持久化
def efficient_persistence():
data = {'arrays': [np.random.rand(1000, 1000) for _ in range(10)]}
# 使用joblib的dump/load,比pickle快
dump(data, 'data.joblib', compress=3) # 压缩级别0-9
loaded_data = load('data.joblib')
性能对比实验
import time
import asyncio
import requests
import aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from joblib import Parallel, delayed
def benchmark_all():
"""全方位性能对比"""
# I/O密集型任务
def io_task(n):
time.sleep(0.01)
return n ** 2
# CPU密集型任务
def cpu_task(n):
return sum(i ** 2 for i in range(n))
task_count = 100
results = {}
# 串行基准
start = time.perf_counter()
[io_task(i) for i in range(task_count)]
results['Serial I/O'] = time.perf_counter() - start
# 线程池
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(io_task, range(task_count)))
results['ThreadPool I/O'] = time.perf_counter() - start
# 进程池
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_task, [10000] * 20))
results['ProcessPool CPU'] = time.perf_counter() - start
# Joblib
start = time.perf_counter()
Parallel(n_jobs=4)(delayed(cpu_task)(10000) for _ in range(20))
results['Joblib CPU'] = time.perf_counter() - start
for name, duration in results.items():
print(f"{name:20} {duration:.3f}s")
实战踩坑记录
asyncio 和 threading 混用
# 错误:在线程中运行事件循环
def wrong_way():
def thread_target():
# 这会出问题!
asyncio.run(some_async_function())
thread = threading.Thread(target=thread_target)
thread.start()
# 正确:使用专门的方法
def right_way():
def thread_target():
# 创建新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(some_async_function())
loop.close()
multiprocessing 在 Windows 上的坑
# Windows 上必须有这个守卫
if __name__ == '__main__':
# multiprocessing 代码
pass
# 否则会无限递归创建进程
死锁检测
import threading
import time
def detect_deadlock():
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1():
with lock1:
time.sleep(0.1)
with lock2:
pass
def thread2():
with lock2:
time.sleep(0.1)
with lock1: # 死锁!
pass
# 使用超时避免永久阻塞
def safe_thread1():
if lock1.acquire(timeout=1):
try:
if lock2.acquire(timeout=1):
try:
pass
finally:
lock2.release()
finally:
lock1.release()
总结思考
GIL不是缺陷是设计:保证了CPython的内存管理线程安全,代价是CPU密集型的并行
async不是银弹:代码复杂度上升,调试困难,生态分裂
进程不是越多越好:考虑内存和启动开销
共享内存很危险:能不共享就不共享,要共享就用队列
性能不是唯一指标:可维护性、可调试性同样重要
最近在生产环境的经验:
Web服务用asyncio(FastAPI)
数据处理用joblib
简单并发用concurrent.futures
实时系统避免Python(GIL是硬伤)